http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
new file mode 100644
index 0000000..06664de
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.rle;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+
+/**
+ * @author Alex Levenson
+ */
+public class TestRunLengthBitPackingHybridEncoder {
+
+ @Test
+ public void testRLEOnly() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+ for (int i = 0; i < 100; i++) {
+ encoder.writeInt(4);
+ }
+ for (int i = 0; i < 100; i++) {
+ encoder.writeInt(5);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = 100 << 1 = 200
+ assertEquals(200, BytesUtils.readUnsignedVarInt(is));
+ // payload = 4
+ assertEquals(4, BytesUtils.readIntLittleEndianOnOneByte(is));
+
+ // header = 100 << 1 = 200
+ assertEquals(200, BytesUtils.readUnsignedVarInt(is));
+ // payload = 5
+ assertEquals(5, BytesUtils.readIntLittleEndianOnOneByte(is));
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testRepeatedZeros() throws Exception {
+ // previousValue is initialized to 0
+ // make sure that repeated 0s at the beginning
+ // of the stream don't trip up the repeat count
+
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+ for (int i = 0; i < 10; i++) {
+ encoder.writeInt(0);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = 10 << 1 = 20
+ assertEquals(20, BytesUtils.readUnsignedVarInt(is));
+ // payload = 4
+ assertEquals(0, BytesUtils.readIntLittleEndianOnOneByte(is));
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testBitWidthZero() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5, 10);
+ for (int i = 0; i < 10; i++) {
+ encoder.writeInt(0);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = 10 << 1 = 20
+ assertEquals(20, BytesUtils.readUnsignedVarInt(is));
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testBitPackingOnly() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+
+ for (int i = 0; i < 100; i++) {
+ encoder.writeInt(i % 3);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = ((104/8) << 1) | 1 = 27
+ assertEquals(27, BytesUtils.readUnsignedVarInt(is));
+
+ List<Integer> values = unpack(3, 104, is);
+
+ for (int i = 0; i < 100; i++) {
+ assertEquals(i % 3, (int) values.get(i));
+ }
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testBitPackingOverflow() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+
+ for (int i = 0; i < 1000; i++) {
+ encoder.writeInt(i % 3);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // 504 is the max number of values in a bit packed run
+ // that still has a header of 1 byte
+ // header = ((504/8) << 1) | 1 = 127
+ assertEquals(127, BytesUtils.readUnsignedVarInt(is));
+ List<Integer> values = unpack(3, 504, is);
+
+ for (int i = 0; i < 504; i++) {
+ assertEquals(i % 3, (int) values.get(i));
+ }
+
+ // there should now be 496 values in another bit-packed run
+ // header = ((496/8) << 1) | 1 = 125
+ assertEquals(125, BytesUtils.readUnsignedVarInt(is));
+ values = unpack(3, 496, is);
+ for (int i = 0; i < 496; i++) {
+ assertEquals((i + 504) % 3, (int) values.get(i));
+ }
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testTransitionFromBitPackingToRle() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+
+ // 5 obviously bit-packed values
+ encoder.writeInt(0);
+ encoder.writeInt(1);
+ encoder.writeInt(0);
+ encoder.writeInt(1);
+ encoder.writeInt(0);
+
+ // three repeated values, that ought to be bit-packed as well
+ encoder.writeInt(2);
+ encoder.writeInt(2);
+ encoder.writeInt(2);
+
+ // lots more repeated values, that should be rle-encoded
+ for (int i = 0; i < 100; i++) {
+ encoder.writeInt(2);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = ((8/8) << 1) | 1 = 3
+ assertEquals(3, BytesUtils.readUnsignedVarInt(is));
+
+ List<Integer> values = unpack(3, 8, is);
+ assertEquals(Arrays.asList(0, 1, 0, 1, 0, 2, 2, 2), values);
+
+ // header = 100 << 1 = 200
+ assertEquals(200, BytesUtils.readUnsignedVarInt(is));
+ // payload = 2
+ assertEquals(2, BytesUtils.readIntLittleEndianOnOneByte(is));
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5, 10);
+ for (int i = 0; i < 9; i++) {
+ encoder.writeInt(i+1);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = ((16/8) << 1) | 1 = 5
+ assertEquals(5, BytesUtils.readUnsignedVarInt(is));
+
+ List<Integer> values = unpack(5, 16, is);
+
+ assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0), values);
+
+ assertEquals(-1, is.read());
+ }
+
+ @Test
+ public void testSwitchingModes() throws Exception {
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100, 1000);
+
+ // rle first
+ for (int i = 0; i < 25; i++) {
+ encoder.writeInt(17);
+ }
+
+ // bit-packing
+ for (int i = 0; i < 7; i++) {
+ encoder.writeInt(7);
+ }
+
+ encoder.writeInt(8);
+ encoder.writeInt(9);
+ encoder.writeInt(10);
+
+ // bit-packing followed by rle
+ for (int i = 0; i < 25; i++) {
+ encoder.writeInt(6);
+ }
+
+ // followed by a different rle
+ for (int i = 0; i < 8; i++) {
+ encoder.writeInt(5);
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(encoder.toBytes().toByteArray());
+
+ // header = 25 << 1 = 50
+ assertEquals(50, BytesUtils.readUnsignedVarInt(is));
+ // payload = 17, stored in 2 bytes
+ assertEquals(17, BytesUtils.readIntLittleEndianOnTwoBytes(is));
+
+ // header = ((16/8) << 1) | 1 = 5
+ assertEquals(5, BytesUtils.readUnsignedVarInt(is));
+ List<Integer> values = unpack(9, 16, is);
+ int v = 0;
+ for (int i = 0; i < 7; i++) {
+ assertEquals(7, (int) values.get(v));
+ v++;
+ }
+
+ assertEquals(8, (int) values.get(v++));
+ assertEquals(9, (int) values.get(v++));
+ assertEquals(10, (int) values.get(v++));
+
+ for (int i = 0; i < 6; i++) {
+ assertEquals(6, (int) values.get(v));
+ v++;
+ }
+
+ // header = 19 << 1 = 38
+ assertEquals(38, BytesUtils.readUnsignedVarInt(is));
+ // payload = 6, stored in 2 bytes
+ assertEquals(6, BytesUtils.readIntLittleEndianOnTwoBytes(is));
+
+ // header = 8 << 1 = 16
+ assertEquals(16, BytesUtils.readUnsignedVarInt(is));
+ // payload = 5, stored in 2 bytes
+ assertEquals(5, BytesUtils.readIntLittleEndianOnTwoBytes(is));
+
+ // end of stream
+ assertEquals(-1, is.read());
+ }
+
+
+ @Test
+ public void testGroupBoundary() throws Exception {
+ byte[] bytes = new byte[2];
+ // Create an RLE byte stream that has 3 values (1 literal group) with
+ // bit width 2.
+ bytes[0] = (1 << 1 )| 1;
+ bytes[1] = (1 << 0) | (2 << 2) | (3 << 4);
+ ByteArrayInputStream stream = new ByteArrayInputStream(bytes);
+ RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream);
+ assertEquals(decoder.readInt(), 1);
+ assertEquals(decoder.readInt(), 2);
+ assertEquals(decoder.readInt(), 3);
+ assertEquals(stream.available(), 0);
+ }
+
+ private static List<Integer> unpack(int bitWidth, int numValues, ByteArrayInputStream is)
+ throws Exception {
+
+ BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ int[] unpacked = new int[8];
+ byte[] next8Values = new byte[bitWidth];
+
+ List<Integer> values = new ArrayList<Integer>(numValues);
+
+ while(values.size() < numValues) {
+ for (int i = 0; i < bitWidth; i++) {
+ next8Values[i] = (byte) is.read();
+ }
+
+ packer.unpack8Values(next8Values, 0, unpacked, 0);
+
+ for (int v = 0; v < 8; v++) {
+ values.add(unpacked[v]);
+ }
+ }
+
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/DummyUdp.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/DummyUdp.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/DummyUdp.java
new file mode 100644
index 0000000..d149224
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/DummyUdp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+public class DummyUdp extends UserDefinedPredicate<Integer> {
+
+ @Override
+ public boolean keep(Integer value) {
+ return false;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
new file mode 100644
index 0000000..009da1c
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestFilterApiMethods.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.junit.Test;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.Operators.UserDefinedByClass;
+import org.apache.parquet.io.api.Binary;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.Operators.NotEq;
+
+public class TestFilterApiMethods {
+
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final LongColumn longColumn = longColumn("a.b.l");
+ private static final DoubleColumn doubleColumn = doubleColumn("x.y.z");
+ private static final BinaryColumn binColumn = binaryColumn("a.string.column");
+
+ private static final FilterPredicate predicate =
+ and(not(or(eq(intColumn, 7), notEq(intColumn, 17))), gt(doubleColumn, 100.0));
+
+ @Test
+ public void testFilterPredicateCreation() {
+ FilterPredicate outerAnd = predicate;
+
+ assertTrue(outerAnd instanceof And);
+
+ FilterPredicate not = ((And) outerAnd).getLeft();
+ FilterPredicate gt = ((And) outerAnd).getRight();
+ assertTrue(not instanceof Not);
+
+ FilterPredicate or = ((Not) not).getPredicate();
+ assertTrue(or instanceof Or);
+
+ FilterPredicate leftEq = ((Or) or).getLeft();
+ FilterPredicate rightNotEq = ((Or) or).getRight();
+ assertTrue(leftEq instanceof Eq);
+ assertTrue(rightNotEq instanceof NotEq);
+ assertEquals(7, ((Eq) leftEq).getValue());
+ assertEquals(17, ((NotEq) rightNotEq).getValue());
+ assertEquals(ColumnPath.get("a", "b", "c"), ((Eq) leftEq).getColumn().getColumnPath());
+ assertEquals(ColumnPath.get("a", "b", "c"), ((NotEq) rightNotEq).getColumn().getColumnPath());
+
+ assertTrue(gt instanceof Gt);
+ assertEquals(100.0, ((Gt) gt).getValue());
+ assertEquals(ColumnPath.get("x", "y", "z"), ((Gt) gt).getColumn().getColumnPath());
+ }
+
+ @Test
+ public void testToString() {
+ FilterPredicate pred = or(predicate, notEq(binColumn, Binary.fromString("foobarbaz")));
+ assertEquals("or(and(not(or(eq(a.b.c, 7), noteq(a.b.c, 17))), gt(x.y.z, 100.0)), "
+ + "noteq(a.string.column, Binary{\"foobarbaz\"}))",
+ pred.toString());
+ }
+
+ @Test
+ public void testUdp() {
+ FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class));
+ assertTrue(predicate instanceof Or);
+ FilterPredicate ud = ((Or) predicate).getRight();
+ assertTrue(ud instanceof UserDefinedByClass);
+ assertEquals(DummyUdp.class, ((UserDefinedByClass) ud).getUserDefinedPredicateClass());
+ assertTrue(((UserDefined) ud).getUserDefinedPredicate() instanceof DummyUdp);
+ }
+
+ @Test
+ public void testSerializable() throws Exception {
+ BinaryColumn binary = binaryColumn("foo");
+ FilterPredicate p = and(or(and(userDefined(intColumn, DummyUdp.class), predicate), eq(binary, Binary.fromString("hi"))), userDefined(longColumn, new IsMultipleOf(7)));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(p);
+ oos.close();
+
+ ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ FilterPredicate read = (FilterPredicate) is.readObject();
+ assertEquals(p, read);
+ }
+
+ public static class IsMultipleOf extends UserDefinedPredicate<Long> implements Serializable {
+
+ private long of;
+
+ public IsMultipleOf(long of) {
+ this.of = of;
+ }
+
+ @Override
+ public boolean keep(Long value) {
+ if (value == null) {
+ return false;
+ }
+ return value % of == 0;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Long> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Long> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ IsMultipleOf that = (IsMultipleOf) o;
+ return this.of == that.of;
+ }
+
+ @Override
+ public int hashCode() {
+ return new Long(of).hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "IsMultipleOf(" + of + ")";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverseRewriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverseRewriter.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverseRewriter.java
new file mode 100644
index 0000000..f2f7745
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverseRewriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverseRewriter.rewrite;
+
+public class TestLogicalInverseRewriter {
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+ private static final FilterPredicate complex =
+ and(
+ not(
+ or(ltEq(doubleColumn, 12.0),
+ and(
+ not(or(eq(intColumn, 7), notEq(intColumn, 17))),
+ userDefined(intColumn, DummyUdp.class)))),
+ or(gt(doubleColumn, 100.0), not(gtEq(intColumn, 77))));
+
+ private static final FilterPredicate complexCollapsed =
+ and(
+ and(gt(doubleColumn, 12.0),
+ or(
+ or(eq(intColumn, 7), notEq(intColumn, 17)),
+ new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
+ or(gt(doubleColumn, 100.0), lt(intColumn, 77)));
+
+ private static void assertNoOp(FilterPredicate p) {
+ assertEquals(p, rewrite(p));
+ }
+
+ @Test
+ public void testBaseCases() {
+ UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+ assertNoOp(eq(intColumn, 17));
+ assertNoOp(notEq(intColumn, 17));
+ assertNoOp(lt(intColumn, 17));
+ assertNoOp(ltEq(intColumn, 17));
+ assertNoOp(gt(intColumn, 17));
+ assertNoOp(gtEq(intColumn, 17));
+ assertNoOp(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ assertNoOp(or(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ assertNoOp(ud);
+
+ assertEquals(notEq(intColumn, 17), rewrite(not(eq(intColumn, 17))));
+ assertEquals(eq(intColumn, 17), rewrite(not(notEq(intColumn, 17))));
+ assertEquals(gtEq(intColumn, 17), rewrite(not(lt(intColumn, 17))));
+ assertEquals(gt(intColumn, 17), rewrite(not(ltEq(intColumn, 17))));
+ assertEquals(ltEq(intColumn, 17), rewrite(not(gt(intColumn, 17))));
+ assertEquals(lt(intColumn, 17), rewrite(not(gtEq(intColumn, 17))));
+ assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), rewrite(not(ud)));
+
+ FilterPredicate notedAnd = not(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ FilterPredicate distributedAnd = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+ assertEquals(distributedAnd, rewrite(notedAnd));
+
+ FilterPredicate andWithNots = and(not(gtEq(intColumn, 17)), lt(intColumn, 7));
+ FilterPredicate andWithoutNots = and(lt(intColumn, 17), lt(intColumn, 7));
+ assertEquals(andWithoutNots, rewrite(andWithNots));
+ }
+
+ @Test
+ public void testComplex() {
+ assertEquals(complexCollapsed, rewrite(complex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java
new file mode 100644
index 0000000..6b8da78
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+
+public class TestLogicalInverter {
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+ private static final UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+ private static final FilterPredicate complex =
+ and(
+ or(ltEq(doubleColumn, 12.0),
+ and(
+ not(or(eq(intColumn, 7), notEq(intColumn, 17))),
+ userDefined(intColumn, DummyUdp.class))),
+ or(gt(doubleColumn, 100.0), notEq(intColumn, 77)));
+
+ private static final FilterPredicate complexInverse =
+ or(
+ and(gt(doubleColumn, 12.0),
+ or(
+ or(eq(intColumn, 7), notEq(intColumn, 17)),
+ new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
+ and(ltEq(doubleColumn, 100.0), eq(intColumn, 77)));
+
+ @Test
+ public void testBaseCases() {
+ assertEquals(notEq(intColumn, 17), invert(eq(intColumn, 17)));
+ assertEquals(eq(intColumn, 17), invert(notEq(intColumn, 17)));
+ assertEquals(gtEq(intColumn, 17), invert(lt(intColumn, 17)));
+ assertEquals(gt(intColumn, 17), invert(ltEq(intColumn, 17)));
+ assertEquals(ltEq(intColumn, 17), invert(gt(intColumn, 17)));
+ assertEquals(lt(intColumn, 17), invert(gtEq(intColumn, 17)));
+
+ FilterPredicate andPos = and(eq(intColumn, 17), eq(doubleColumn, 12.0));
+ FilterPredicate andInv = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+ assertEquals(andInv, invert(andPos));
+
+ FilterPredicate orPos = or(eq(intColumn, 17), eq(doubleColumn, 12.0));
+ FilterPredicate orInv = and(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+ assertEquals(orPos, invert(orInv));
+
+ assertEquals(eq(intColumn, 17), invert(not(eq(intColumn, 17))));
+
+ UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+ assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), invert(ud));
+ assertEquals(ud, invert(not(ud)));
+ assertEquals(ud, invert(new LogicalNotUserDefined<Integer, DummyUdp>(ud)));
+ }
+
+ @Test
+ public void testComplex() {
+ assertEquals(complexInverse, invert(complex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
new file mode 100644
index 0000000..455eae4
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate;
+
+public class TestSchemaCompatibilityValidator {
+ private static final BinaryColumn stringC = binaryColumn("c");
+ private static final LongColumn longBar = longColumn("x.bar");
+ private static final IntColumn intBar = intColumn("x.bar");
+ private static final LongColumn lotsOfLongs = longColumn("lotsOfLongs");
+
+ private static final String schemaString =
+ "message Document {\n"
+ + " required int32 a;\n"
+ + " required binary b;\n"
+ + " required binary c (UTF8);\n"
+ + " required group x { required int32 bar; }\n"
+ + " repeated int64 lotsOfLongs;\n"
+ + "}\n";
+
+ private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
+
+ private static final FilterPredicate complexValid =
+ and(
+ or(ltEq(stringC, Binary.fromString("foo")),
+ and(
+ not(or(eq(intBar, 17), notEq(intBar, 17))),
+ userDefined(intBar, DummyUdp.class))),
+ or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+ static class LongDummyUdp extends UserDefinedPredicate<Long> {
+ @Override
+ public boolean keep(Long value) {
+ return false;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Long> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Long> statistics) {
+ return false;
+ }
+ }
+
+ private static final FilterPredicate complexWrongType =
+ and(
+ or(ltEq(stringC, Binary.fromString("foo")),
+ and(
+ not(or(eq(longBar, 17L), notEq(longBar, 17L))),
+ userDefined(longBar, LongDummyUdp.class))),
+ or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+ private static final FilterPredicate complexMixedType =
+ and(
+ or(ltEq(stringC, Binary.fromString("foo")),
+ and(
+ not(or(eq(intBar, 17), notEq(longBar, 17L))),
+ userDefined(longBar, LongDummyUdp.class))),
+ or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+ @Test
+ public void testValidType() {
+ validate(complexValid, schema);
+ }
+
+ @Test
+ public void testFindsInvalidTypes() {
+ try {
+ validate(complexWrongType, schema);
+ fail("this should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("FilterPredicate column: x.bar's declared type (java.lang.Long) does not match the schema found in file metadata. "
+ + "Column x.bar is of type: FullTypeDescriptor(PrimitiveType: INT32, OriginalType: null)\n"
+ + "Valid types for this column are: [class java.lang.Integer]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwiceDeclaredColumn() {
+ validate(eq(stringC, Binary.fromString("larry")), schema);
+
+ try {
+ validate(complexMixedType, schema);
+ fail("this should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column: x.bar was provided with different types in the same predicate. Found both: (class java.lang.Integer, class java.lang.Long)", e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testRepeatedNotSupported() {
+ try {
+ validate(eq(lotsOfLongs, 10l), schema);
+ fail("this should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("FilterPredicates do not currently support repeated columns. Column lotsOfLongs is repeated.", e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
new file mode 100644
index 0000000..8da008f
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.booleanColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.ValidTypeMap.assertTypeValid;
+
+public class TestValidTypeMap {
+ public static IntColumn intColumn = intColumn("int.column");
+ public static LongColumn longColumn = longColumn("long.column");
+ public static FloatColumn floatColumn = floatColumn("float.column");
+ public static DoubleColumn doubleColumn = doubleColumn("double.column");
+ public static BooleanColumn booleanColumn = booleanColumn("boolean.column");
+ public static BinaryColumn binaryColumn = binaryColumn("binary.column");
+
+ private static class InvalidColumnType implements Comparable<InvalidColumnType> {
+ @Override
+ public int compareTo(InvalidColumnType o) {
+ return 0;
+ }
+ }
+
+ public static Column<InvalidColumnType> invalidColumn =
+ new Column<InvalidColumnType>(ColumnPath.get("invalid.column"), InvalidColumnType.class) { };
+
+ @Test
+ public void testValidTypes() {
+ assertTypeValid(intColumn, PrimitiveTypeName.INT32, null);
+ assertTypeValid(longColumn, PrimitiveTypeName.INT64, null);
+ assertTypeValid(floatColumn, PrimitiveTypeName.FLOAT, null);
+ assertTypeValid(doubleColumn, PrimitiveTypeName.DOUBLE, null);
+ assertTypeValid(booleanColumn, PrimitiveTypeName.BOOLEAN, null);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, null);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, OriginalType.UTF8);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8);
+ }
+
+ @Test
+ public void testMismatchedTypes() {
+ try {
+ assertTypeValid(intColumn, PrimitiveTypeName.DOUBLE, null);
+ fail("This should throw!");
+ } catch (IllegalArgumentException e) {
+ assertEquals("FilterPredicate column: int.column's declared type (java.lang.Integer) does not match the "
+ + "schema found in file metadata. Column int.column is of type: "
+ + "FullTypeDescriptor(PrimitiveType: DOUBLE, OriginalType: null)\n"
+ + "Valid types for this column are: [class java.lang.Double]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testUnsupportedType() {
+ try {
+ assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, null);
+ fail("This should throw!");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column invalid.column was declared as type: "
+ + "org.apache.parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+ + "in FilterPredicates. Supported types for this column are: [class java.lang.Integer]", e.getMessage());
+ }
+
+ try {
+ assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, OriginalType.UTF8);
+ fail("This should throw!");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column invalid.column was declared as type: "
+ + "org.apache.parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+ + "in FilterPredicates. There are no supported types for columns of FullTypeDescriptor(PrimitiveType: INT32, OriginalType: UTF8)",
+ e.getMessage());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..1d872a1
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.junit.Test;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateEvaluator.evaluate;
+
+public class TestIncrementallyUpdatedFilterPredicateEvaluator {
+
+ public static class ShortCircuitException extends RuntimeException {
+ public ShortCircuitException() {
+ super("this was supposed to short circuit and never get here!");
+ }
+ }
+
+ public static ValueInspector intIsNull() {
+ return new ValueInspector() {
+ @Override
+ public void updateNull() {
+ setResult(true);
+ }
+
+ @Override
+ public void update(int value) {
+ setResult(false);
+ }
+ };
+ }
+
+ public static ValueInspector intIsEven() {
+ return new ValueInspector() {
+ @Override
+ public void updateNull() {
+ setResult(false);
+ }
+
+ @Override
+ public void update(int value) {
+ setResult(value % 2 == 0);
+ }
+ };
+ }
+
+ public static ValueInspector doubleMoreThan10() {
+ return new ValueInspector() {
+ @Override
+ public void updateNull() {
+ setResult(false);
+ }
+
+ @Override
+ public void update(double value) {
+ setResult(value > 10.0);
+ }
+ };
+ }
+
+ @Test
+ public void testValueInspector() {
+ // known, and set to false criteria, null considered false
+ ValueInspector v = intIsEven();
+ v.update(11);
+ assertFalse(evaluate(v));
+ v.reset();
+
+ // known and set to true criteria, null considered false
+ v.update(12);
+ assertTrue(evaluate(v));
+ v.reset();
+
+ // known and set to null, null considered false
+ v.updateNull();
+ assertFalse(evaluate(v));
+ v.reset();
+
+ // known, and set to false criteria, null considered true
+ ValueInspector intIsNull = intIsNull();
+ intIsNull.update(10);
+ assertFalse(evaluate(intIsNull));
+ intIsNull.reset();
+
+ // known, and set to false criteria, null considered true
+ intIsNull.updateNull();
+ assertTrue(evaluate(intIsNull));
+ intIsNull.reset();
+
+ // unknown, null considered false
+ v.reset();
+ assertFalse(evaluate(v));
+
+ // unknown, null considered true
+ intIsNull.reset();
+ assertTrue(evaluate(intIsNull));
+ }
+
+ private void doOrTest(ValueInspector v1, ValueInspector v2, int v1Value, int v2Value, boolean expected) {
+ v1.update(v1Value);
+ v2.update(v2Value);
+ IncrementallyUpdatedFilterPredicate or = new Or(v1, v2);
+ assertEquals(expected, evaluate(or));
+ v1.reset();
+ v2.reset();
+ }
+
+ private void doAndTest(ValueInspector v1, ValueInspector v2, int v1Value, int v2Value, boolean expected) {
+ v1.update(v1Value);
+ v2.update(v2Value);
+ IncrementallyUpdatedFilterPredicate and = new And(v1, v2);
+ assertEquals(expected, evaluate(and));
+ v1.reset();
+ v2.reset();
+ }
+
+
+ @Test
+ public void testOr() {
+ ValueInspector v1 = intIsEven();
+ ValueInspector v2 = intIsEven();
+
+ int F = 11;
+ int T = 12;
+
+ // F || F == F
+ doOrTest(v1, v2, F, F, false);
+ // F || T == T
+ doOrTest(v1, v2, F, T, true);
+ // T || F == T
+ doOrTest(v1, v2, T, F, true);
+ // T || T == T
+ doOrTest(v1, v2, T, T, true);
+
+ }
+
+ @Test
+ public void testAnd() {
+ ValueInspector v1 = intIsEven();
+ ValueInspector v2 = intIsEven();
+
+ int F = 11;
+ int T = 12;
+
+ // F && F == F
+ doAndTest(v1, v2, F, F, false);
+ // F && T == F
+ doAndTest(v1, v2, F, T, false);
+ // T && F == F
+ doAndTest(v1, v2, T, F, false);
+ // T && T == T
+ doAndTest(v1, v2, T, T, true);
+
+ }
+
+ @Test
+ public void testShortCircuit() {
+ ValueInspector neverCalled = new ValueInspector() {
+ @Override
+ public boolean accept(Visitor visitor) {
+ throw new ShortCircuitException();
+ }
+ };
+
+ try {
+ evaluate(neverCalled);
+ fail("this should throw");
+ } catch (ShortCircuitException e) {
+ //
+ }
+
+ // T || X should evaluate to true without inspecting X
+ ValueInspector v = intIsEven();
+ v.update(10);
+ IncrementallyUpdatedFilterPredicate or = new Or(v, neverCalled);
+ assertTrue(evaluate(or));
+ v.reset();
+
+ // F && X should evaluate to false without inspecting X
+ v.update(11);
+ IncrementallyUpdatedFilterPredicate and = new And(v, neverCalled);
+ assertFalse(evaluate(and));
+ v.reset();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..f037a9d
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+
+import org.junit.Test;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.apache.parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.doubleMoreThan10;
+import static org.apache.parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsEven;
+import static org.apache.parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsNull;
+
+public class TestIncrementallyUpdatedFilterPredicateResetter {
+ @Test
+ public void testReset() {
+
+ ValueInspector intIsNull = intIsNull();
+ ValueInspector intIsEven = intIsEven();
+ ValueInspector doubleMoreThan10 = doubleMoreThan10();
+
+ IncrementallyUpdatedFilterPredicate pred = new Or(intIsNull, new And(intIsEven, doubleMoreThan10));
+
+ intIsNull.updateNull();
+ intIsEven.update(11);
+ doubleMoreThan10.update(20.0D);
+
+ assertTrue(intIsNull.isKnown());
+ assertTrue(intIsEven.isKnown());
+ assertTrue(doubleMoreThan10.isKnown());
+
+ IncrementallyUpdatedFilterPredicateResetter.reset(pred);
+
+ assertFalse(intIsNull.isKnown());
+ assertFalse(intIsEven.isKnown());
+ assertFalse(doubleMoreThan10.isKnown());
+
+ intIsNull.updateNull();
+ assertTrue(intIsNull.isKnown());
+ assertFalse(intIsEven.isKnown());
+ assertFalse(doubleMoreThan10.isKnown());
+
+ IncrementallyUpdatedFilterPredicateResetter.reset(pred);
+ assertFalse(intIsNull.isKnown());
+ assertFalse(intIsEven.isKnown());
+ assertFalse(doubleMoreThan10.isKnown());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestValueInspector.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestValueInspector.java b/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestValueInspector.java
new file mode 100644
index 0000000..263aa84
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/recordlevel/TestValueInspector.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsEven;
+
+public class TestValueInspector {
+
+ @Test
+ public void testLifeCycle() {
+ ValueInspector v = intIsEven();
+
+ // begins in unknown state
+ assertFalse(v.isKnown());
+ // calling getResult in unknown state throws
+ try {
+ v.getResult();
+ fail("this should throw");
+ } catch (IllegalStateException e) {
+ assertEquals("getResult() called on a ValueInspector whose result is not yet known!", e.getMessage());
+ }
+
+ // update state to known
+ v.update(10);
+
+ // v was updated with value 10, so result is known and should be true
+ assertTrue(v.isKnown());
+ assertTrue(v.getResult());
+
+ // calling update w/o resetting should throw
+ try {
+ v.update(11);
+ fail("this should throw");
+ } catch (IllegalStateException e) {
+ assertEquals("setResult() called on a ValueInspector whose result is already known!"
+ + " Did you forget to call reset()?", e.getMessage());
+ }
+
+ // back to unknown state
+ v.reset();
+
+ assertFalse(v.isKnown());
+ // calling getResult in unknown state throws
+ try {
+ v.getResult();
+ fail("this should throw");
+ } catch (IllegalStateException e) {
+ assertEquals("getResult() called on a ValueInspector whose result is not yet known!", e.getMessage());
+ }
+
+ // v was updated with value 11, so result is known and should be false
+ v.update(11);
+ assertTrue(v.isKnown());
+ assertFalse(v.getResult());
+
+ }
+
+ @Test
+ public void testReusable() {
+ List<Integer> values = Arrays.asList(2, 4, 7, 3, 8, 8, 11, 200);
+ ValueInspector v = intIsEven();
+
+ for (Integer x : values) {
+ v.update(x);
+ assertEquals(x % 2 == 0, v.getResult());
+ v.reset();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java b/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
new file mode 100644
index 0000000..635657b
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+public class ConverterConsumer extends RecordConsumer {
+
+ private final GroupConverter root;
+ private final MessageType schema;
+
+ private Deque<GroupConverter> path = new ArrayDeque<GroupConverter>();
+ private Deque<Type> typePath = new ArrayDeque<Type>();
+ private GroupConverter current;
+ private PrimitiveConverter currentPrimitive;
+ private Type currentType;
+
+ public ConverterConsumer(GroupConverter recordConsumer, MessageType schema) {
+ this.root = recordConsumer;
+ this.schema = schema;
+ }
+
+ @Override
+ public void startMessage() {
+ root.start();
+ this.currentType = schema;
+ this.current = root;
+ }
+
+ @Override
+ public void endMessage() {
+ root.end();
+ }
+
+ @Override
+ public void startField(String field, int index) {
+ path.push(current);
+ typePath.push(currentType);
+ currentType = currentType.asGroupType().getType(index);
+ if (currentType.isPrimitive()) {
+ currentPrimitive = current.getConverter(index).asPrimitiveConverter();
+ } else {
+ current = current.getConverter(index).asGroupConverter();
+ }
+ }
+
+ @Override
+ public void endField(String field, int index) {
+ currentType = typePath.pop();
+ current = path.pop();
+ }
+
+ @Override
+ public void startGroup() {
+ current.start();
+ }
+
+ @Override
+ public void endGroup() {
+ current.end();
+ }
+
+ @Override
+ public void addInteger(int value) {
+ currentPrimitive.addInt(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ currentPrimitive.addLong(value);
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ currentPrimitive.addBoolean(value);
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ currentPrimitive.addBinary(value);
+ }
+
+ @Override
+ public void addFloat(float value) {
+ currentPrimitive.addFloat(value);
+ }
+
+ @Override
+ public void addDouble(double value) {
+ currentPrimitive.addDouble(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingConverter.java b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingConverter.java
new file mode 100644
index 0000000..914d521
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingConverter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.TypeConverter;
+
+public class ExpectationValidatingConverter extends RecordMaterializer<Void> {
+
+ private GroupConverter root;
+
+ private final Deque<String> expectations;
+ int count = 0;
+
+ public void validate(String got) {
+ assertEquals("event #"+count, expectations.pop(), got);
+ ++count;
+ }
+
+ public ExpectationValidatingConverter(String[] expectations, MessageType schema) {
+ this(new ArrayDeque<String>(Arrays.asList(expectations)), schema);
+ }
+
+ public ExpectationValidatingConverter(Deque<String> expectations, MessageType schema) {
+ this.expectations = expectations;
+ this.root = (GroupConverter)schema.convertWith(new TypeConverter<Converter>() {
+
+ @Override
+ public Converter convertPrimitiveType(final List<GroupType> path, final PrimitiveType primitiveType) {
+ return new PrimitiveConverter() {
+
+ private void validate(String message) {
+ ExpectationValidatingConverter.this.validate(path(path, primitiveType) + message);
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ validate("addBinary("+value.toStringUsingUTF8()+")");
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ validate("addBoolean("+value+")");
+ }
+
+ @Override
+ public void addDouble(double value) {
+ validate("addDouble("+value+")");
+ }
+
+ @Override
+ public void addFloat(float value) {
+ validate("addFloat("+value+")");
+ }
+
+ @Override
+ public void addInt(int value) {
+ validate("addInt("+value+")");
+ }
+
+ @Override
+ public void addLong(long value) {
+ validate("addLong("+value+")");
+ }
+ };
+ }
+
+ @Override
+ public Converter convertGroupType(final List<GroupType> path, final GroupType groupType, final List<Converter> children) {
+ return new GroupConverter() {
+
+ private void validate(String message) {
+ ExpectationValidatingConverter.this.validate(path(path, groupType) + message);
+ }
+
+ @Override
+ public void start() {
+ validate("start()");
+ }
+
+ @Override
+ public void end() {
+ validate("end()");
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return children.get(fieldIndex);
+ }
+
+ };
+ }
+
+ @Override
+ public Converter convertMessageType(MessageType messageType, final List<Converter> children) {
+ return new GroupConverter() {
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return children.get(fieldIndex);
+ }
+
+ @Override
+ public void start() {
+ validate("startMessage()");
+ }
+
+ @Override
+ public void end() {
+ validate("endMessage()");
+ }
+ };
+ }
+ });
+ }
+
+ @Override
+ public Void getCurrentRecord() {
+ return null;
+ }
+
+ private String path(List<GroupType> path, Type type) {
+ String pathString = "";
+ if (path.size() > 0) {
+ for (int i = 1; i < path.size(); i++) {
+ pathString += path.get(i).getName() + ".";
+ }
+ }
+ pathString += type.getName() + ".";
+ return pathString;
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
new file mode 100644
index 0000000..36538ea
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Deque;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+
+final public class ExpectationValidatingRecordConsumer extends
+ RecordConsumer {
+ private final Deque<String> expectations;
+ int count = 0;
+
+ public ExpectationValidatingRecordConsumer(Deque<String> expectations) {
+ this.expectations = expectations;
+ }
+
+ private void validate(String got) {
+// System.out.println(" \"" + got + "\";");
+ assertEquals("event #"+count, expectations.pop(), got);
+ ++count;
+ }
+
+ @Override
+ public void startMessage() {
+ validate("startMessage()");
+ }
+
+ @Override
+ public void startGroup() {
+ validate("startGroup()");
+ }
+
+ @Override
+ public void startField(String field, int index) {
+ validate("startField("+field+", "+index+")");
+ }
+
+ @Override
+ public void endMessage() {
+ validate("endMessage()");
+ }
+
+ @Override
+ public void endGroup() {
+ validate("endGroup()");
+ }
+
+ @Override
+ public void endField(String field, int index) {
+ validate("endField("+field+", "+index+")");
+ }
+
+ @Override
+ public void addInteger(int value) {
+ validate("addInt("+value+")");
+ }
+
+ @Override
+ public void addLong(long value) {
+ validate("addLong("+value+")");
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ validate("addBoolean("+value+")");
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ validate("addBinary("+value.toStringUsingUTF8()+")");
+ }
+
+ @Override
+ public void addFloat(float value) {
+ validate("addFloat("+value+")");
+ }
+
+ @Override
+ public void addDouble(double value) {
+ validate("addDouble("+value+")");
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
new file mode 100644
index 0000000..3abf804
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.apache.parquet.example.Paper.r1;
+import static org.apache.parquet.example.Paper.r2;
+import static org.apache.parquet.example.Paper.schema;
+import static org.apache.parquet.example.Paper.schema2;
+import static org.apache.parquet.example.Paper.schema3;
+
+import java.util.logging.Level;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.page.mem.MemPageStore;
+import org.apache.parquet.example.DummyRecordConverter;
+import org.apache.parquet.example.data.GroupWriter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+
+/**
+ * make sure {@link Log#LEVEL} is set to {@link Level#OFF}
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class PerfTest {
+
+ public static void main(String[] args) {
+ MemPageStore memPageStore = new MemPageStore(0);
+ write(memPageStore);
+ read(memPageStore);
+ }
+
+ private static void read(MemPageStore memPageStore) {
+ read(memPageStore, schema, "read all");
+ read(memPageStore, schema, "read all");
+ read(memPageStore, schema2, "read projected");
+ read(memPageStore, schema3, "read projected no Strings");
+ }
+
+ private static void read(MemPageStore memPageStore, MessageType myschema,
+ String message) {
+ MessageColumnIO columnIO = newColumnFactory(myschema);
+ System.out.println(message);
+ RecordMaterializer<Object> recordConsumer = new DummyRecordConverter(myschema);
+ RecordReader<Object> recordReader = columnIO.getRecordReader(memPageStore, recordConsumer);
+
+ read(recordReader, 2, myschema);
+ read(recordReader, 10000, myschema);
+ read(recordReader, 10000, myschema);
+ read(recordReader, 10000, myschema);
+ read(recordReader, 10000, myschema);
+ read(recordReader, 10000, myschema);
+ read(recordReader, 100000, myschema);
+ read(recordReader, 1000000, myschema);
+ System.out.println();
+ }
+
+
+ private static void write(MemPageStore memPageStore) {
+ ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
+ MessageColumnIO columnIO = newColumnFactory(schema);
+
+ GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+ groupWriter.write(r1);
+ groupWriter.write(r2);
+
+ write(memPageStore, groupWriter, 10000);
+ write(memPageStore, groupWriter, 10000);
+ write(memPageStore, groupWriter, 10000);
+ write(memPageStore, groupWriter, 10000);
+ write(memPageStore, groupWriter, 10000);
+ write(memPageStore, groupWriter, 100000);
+ write(memPageStore, groupWriter, 1000000);
+ columns.flush();
+ System.out.println();
+ System.out.println(columns.getBufferedSize() + " bytes used total");
+ System.out.println("max col size: "+columns.maxColMemSize()+" bytes");
+ }
+
+ private static MessageColumnIO newColumnFactory(MessageType schema) {
+ return new ColumnIOFactory().getColumnIO(schema);
+ }
+ private static void read(RecordReader<Object> recordReader, int count, MessageType schema) {
+ Object[] records = new Object[count];
+ System.gc();
+ System.out.print("no gc <");
+ long t0 = System.currentTimeMillis();
+ for (int i = 0; i < records.length; i++) {
+ records[i] = recordReader.read();
+ }
+ long t1 = System.currentTimeMillis();
+ System.out.print("> ");
+ long t = t1-t0;
+ float err = (float)100 * 2 / t; // (+/- 1 ms)
+ System.out.printf(" read %,9d recs in %,5d ms at %,9d rec/s err: %3.2f%%\n", count , t, t == 0 ? 0 : count * 1000 / t, err);
+ if (!records[0].equals("end()")) {
+ throw new RuntimeException(""+records[0]);
+ }
+ }
+
+ private static void write(MemPageStore memPageStore, GroupWriter groupWriter, int count) {
+ long t0 = System.currentTimeMillis();
+ for (int i = 0; i < count; i++) {
+ groupWriter.write(r1);
+ }
+ long t1 = System.currentTimeMillis();
+ long t = t1-t0;
+ memPageStore.addRowCount(count);
+ System.out.printf("written %,9d recs in %,5d ms at %,9d rec/s\n", count, t, t == 0 ? 0 : count * 1000 / t );
+ }
+
+}
|