parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [24/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:21 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
new file mode 100644
index 0000000..7c7b72c
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -0,0 +1,674 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.example.Paper.pr1;
+import static org.apache.parquet.example.Paper.pr2;
+import static org.apache.parquet.example.Paper.r1;
+import static org.apache.parquet.example.Paper.r2;
+import static org.apache.parquet.example.Paper.schema;
+import static org.apache.parquet.example.Paper.schema2;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.mem.MemPageStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.GroupWriter;
+import org.apache.parquet.example.data.simple.NanoTime;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+
+@RunWith(Parameterized.class)
+public class TestColumnIO {
+  private static final Log LOG = Log.getLog(TestColumnIO.class);
+
+  private static final String oneOfEach =
+    "message Document {\n"
+  + "  required int64 a;\n"
+  + "  required int32 b;\n"
+  + "  required float c;\n"
+  + "  required double d;\n"
+  + "  required boolean e;\n"
+  + "  required binary f;\n"
+  + "  required int96 g;\n"
+  + "  required fixed_len_byte_array(3) h;\n"
+  + "}\n";
+
+  private static final String schemaString =
+      "message Document {\n"
+    + "  required int64 DocId;\n"
+    + "  optional group Links {\n"
+    + "    repeated int64 Backward;\n"
+    + "    repeated int64 Forward;\n"
+    + "  }\n"
+    + "  repeated group Name {\n"
+    + "    repeated group Language {\n"
+    + "      required binary Code;\n"
+    + "      optional binary Country;\n"
+    + "    }\n"
+    + "    optional binary Url;\n"
+    + "  }\n"
+    + "}\n";
+
+  int[][] expectedFSA = new int[][] {
+      { 1 },      // 0: DocId
+      { 2, 1 },   // 1: Links.Backward
+      { 3, 2 },   // 2: Links.Forward
+      { 4, 4, 4 },// 3: Name.Language.Code
+      { 5, 5, 3 },// 4: Name.Language.Country
+      { 6, 3 }    // 5: Name.Url
+  };
+
+  int[][] expectedFSA2 = new int[][] {
+      { 1 },      // 0: DocId
+      { 2, 1, 1 },// 1: Name.Language.Country
+  };
+
+  public static final String[] expectedEventsForR1 = {
+    "startMessage()",
+    "DocId.addLong(10)",
+    "Links.start()",
+    "Links.Forward.addLong(20)",
+    "Links.Forward.addLong(40)",
+    "Links.Forward.addLong(60)",
+    "Links.end()",
+    "Name.start()",
+    "Name.Language.start()",
+    "Name.Language.Code.addBinary(en-us)",
+    "Name.Language.Country.addBinary(us)",
+    "Name.Language.end()",
+    "Name.Language.start()",
+    "Name.Language.Code.addBinary(en)",
+    "Name.Language.end()",
+    "Name.Url.addBinary(http://A)",
+    "Name.end()",
+    "Name.start()",
+    "Name.Url.addBinary(http://B)",
+    "Name.end()",
+    "Name.start()",
+    "Name.Language.start()",
+    "Name.Language.Code.addBinary(en-gb)",
+    "Name.Language.Country.addBinary(gb)",
+    "Name.Language.end()",
+    "Name.end()",
+    "endMessage()"
+  };
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    Object[][] data = {
+        { true },
+        { false } };
+    return Arrays.asList(data);
+  }
+
+  private boolean useDictionary;
+
+  public TestColumnIO(boolean useDictionary) {
+    this.useDictionary = useDictionary;
+  }
+
+  @Test
+  public void testSchema() {
+    assertEquals(schemaString, schema.toString());
+  }
+
+  @Test
+  public void testReadUsingRequestedSchemaWithExtraFields(){
+    MessageType orginalSchema = new MessageType("schema",
+            new PrimitiveType(REQUIRED, INT32, "a"),
+            new PrimitiveType(OPTIONAL, INT32, "b")
+    );
+    MessageType schemaWithExtraField = new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT32, "b"),
+            new PrimitiveType(OPTIONAL, INT32, "a"),
+            new PrimitiveType(OPTIONAL, INT32, "c")
+    );
+    MemPageStore memPageStoreForOriginalSchema = new MemPageStore(1);
+    MemPageStore memPageStoreForSchemaWithExtraField = new MemPageStore(1);
+    SimpleGroupFactory groupFactory = new SimpleGroupFactory(orginalSchema);
+    writeGroups(orginalSchema, memPageStoreForOriginalSchema, groupFactory.newGroup().append("a", 1).append("b", 2));
+
+    SimpleGroupFactory groupFactory2 = new SimpleGroupFactory(schemaWithExtraField);
+    writeGroups(schemaWithExtraField, memPageStoreForSchemaWithExtraField, groupFactory2.newGroup().append("a", 1).append("b", 2).append("c",3));
+
+    {
+      List<Group> groups = new ArrayList<Group>();
+      groups.addAll(readGroups(memPageStoreForOriginalSchema, orginalSchema, schemaWithExtraField, 1));
+      groups.addAll(readGroups(memPageStoreForSchemaWithExtraField, schemaWithExtraField, schemaWithExtraField, 1));
+      // TODO: add once we have the support for empty projection
+//      groups1.addAll(readGroups(memPageStore3, schema3, schema2, 1));
+      Object[][] expected = {
+              { 2, 1, null},
+              { 2, 1, 3},
+//          { null, null}
+      };
+      validateGroups(groups, expected);
+    }
+  }
+
+  @Test
+  public void testReadUsingRequestedSchemaWithIncompatibleField(){
+    MessageType originalSchema = new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT32, "e"));
+    MemPageStore store = new MemPageStore(1);
+    SimpleGroupFactory groupFactory = new SimpleGroupFactory(originalSchema);
+    writeGroups(originalSchema, store, groupFactory.newGroup().append("e", 4));
+
+    try {
+      MessageType schemaWithIncompatibleField = new MessageType("schema",
+              new PrimitiveType(OPTIONAL, BINARY, "e")); // Incompatible schema: different type
+      readGroups(store, originalSchema, schemaWithIncompatibleField, 1);
+      fail("should have thrown an incompatible schema exception");
+    } catch (ParquetDecodingException e) {
+      assertEquals("The requested schema is not compatible with the file schema. incompatible types: optional binary e != optional int32 e", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testReadUsingSchemaWithRequiredFieldThatWasOptional(){
+    MessageType originalSchema = new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT32, "e"));
+    MemPageStore store = new MemPageStore(1);
+    SimpleGroupFactory groupFactory = new SimpleGroupFactory(originalSchema);
+    writeGroups(originalSchema, store, groupFactory.newGroup().append("e", 4));
+
+    try {
+      MessageType schemaWithRequiredFieldThatWasOptional = new MessageType("schema",
+              new PrimitiveType(REQUIRED, INT32, "e")); // Incompatible schema: required when it was optional
+      readGroups(store, originalSchema, schemaWithRequiredFieldThatWasOptional, 1);
+      fail("should have thrown an incompatible schema exception");
+    } catch (ParquetDecodingException e) {
+      assertEquals("The requested schema is not compatible with the file schema. incompatible types: required int32 e != optional int32 e", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testReadUsingProjectedSchema(){
+    MessageType orginalSchema = new MessageType("schema",
+            new PrimitiveType(REQUIRED, INT32, "a"),
+            new PrimitiveType(REQUIRED, INT32, "b")
+    );
+    MessageType projectedSchema = new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT32, "b")
+    );
+    MemPageStore store = new MemPageStore(1);
+    SimpleGroupFactory groupFactory = new SimpleGroupFactory(orginalSchema);
+    writeGroups(orginalSchema, store, groupFactory.newGroup().append("a", 1).append("b", 2));
+
+    {
+      List<Group> groups = new ArrayList<Group>();
+      groups.addAll(readGroups(store, orginalSchema, projectedSchema, 1));
+      Object[][] expected = {
+              {2},
+      };
+      validateGroups(groups, expected);
+    }
+  }
+
+  private void validateGroups(List<Group> groups1, Object[][] e1) {
+    Iterator<Group> i1 = groups1.iterator();
+    for (int i = 0; i < e1.length; i++) {
+      Object[] objects = e1[i];
+      Group next = i1.next();
+      for (int j = 0; j < objects.length; j++) {
+        Object object = objects[j];
+        if (object == null) {
+          assertEquals(0, next.getFieldRepetitionCount(j));
+        } else {
+          assertEquals("looking for r[" + i + "][" + j + "][0]=" + object, 1, next.getFieldRepetitionCount(j));
+          assertEquals(object, next.getInteger(j, 0));
+        }
+      }
+    }
+  }
+
+  private List<Group> readGroups(MemPageStore memPageStore, MessageType fileSchema, MessageType requestedSchema, int n) {
+    ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
+    MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
+    RecordReaderImplementation<Group> recordReader = getRecordReader(columnIO, requestedSchema, memPageStore);
+    List<Group> groups = new ArrayList<Group>();
+    for (int i = 0; i < n; i++) {
+      groups.add(recordReader.read());
+    }
+    return groups;
+  }
+
+  private void writeGroups(MessageType writtenSchema, MemPageStore memPageStore, Group... groups) {
+    ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
+    MessageColumnIO columnIO = columnIOFactory.getColumnIO(writtenSchema);
+    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), writtenSchema);
+    for (Group group : groups) {
+      groupWriter.write(group);
+    }
+    columns.flush();
+  }
+
+  @Test
+  public void testColumnIO() {
+    log(schema);
+    log("r1");
+    log(r1);
+    log("r2");
+    log(r2);
+
+    MemPageStore memPageStore = new MemPageStore(2);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
+
+    ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
+    {
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema);
+      log(columnIO);
+      GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+      groupWriter.write(r1);
+      groupWriter.write(r2);
+      columns.flush();
+      log(columns);
+      log("=========");
+
+      RecordReaderImplementation<Group> recordReader = getRecordReader(columnIO, schema, memPageStore);
+
+      validateFSA(expectedFSA, columnIO, recordReader);
+
+      List<Group> records = new ArrayList<Group>();
+      records.add(recordReader.read());
+      records.add(recordReader.read());
+
+      int i = 0;
+      for (Group record : records) {
+        log("r" + (++i));
+        log(record);
+      }
+      assertEquals("deserialization does not display the same result", r1.toString(), records.get(0).toString());
+      assertEquals("deserialization does not display the same result", r2.toString(), records.get(1).toString());
+    }
+    {
+      MessageColumnIO columnIO2 = columnIOFactory.getColumnIO(schema2);
+
+      List<Group> records = new ArrayList<Group>();
+      RecordReaderImplementation<Group> recordReader = getRecordReader(columnIO2, schema2, memPageStore);
+
+      validateFSA(expectedFSA2, columnIO2, recordReader);
+
+      records.add(recordReader.read());
+      records.add(recordReader.read());
+
+      int i = 0;
+      for (Group record : records) {
+        log("r" + (++i));
+        log(record);
+      }
+      assertEquals("deserialization does not display the expected result", pr1.toString(), records.get(0).toString());
+      assertEquals("deserialization does not display the expected result", pr2.toString(), records.get(1).toString());
+    }
+  }
+
+  @Test
+  public void testOneOfEach() {
+    MessageType oneOfEachSchema = MessageTypeParser.parseMessageType(oneOfEach);
+    GroupFactory gf = new SimpleGroupFactory(oneOfEachSchema);
+    Group g1 = gf.newGroup()
+        .append("a", 1l)
+        .append("b", 2)
+        .append("c", 3.0f)
+        .append("d", 4.0d)
+        .append("e", true)
+        .append("f", Binary.fromString("6"))
+        .append("g", new NanoTime(1234, System.currentTimeMillis() * 1000))
+        .append("h", Binary.fromString("abc"));
+
+    testSchema(oneOfEachSchema, Arrays.asList(g1));
+  }
+
+  @Test
+  public void testRequiredOfRequired() {
+    MessageType reqreqSchema = MessageTypeParser.parseMessageType(
+          "message Document {\n"
+        + "  required group foo {\n"
+        + "    required int64 bar;\n"
+        + "  }\n"
+        + "}\n");
+
+    GroupFactory gf = new SimpleGroupFactory(reqreqSchema);
+    Group g1 = gf.newGroup();
+    g1.addGroup("foo").append("bar", 2l);
+
+    testSchema(reqreqSchema, Arrays.asList(g1));
+  }
+
+  @Test
+  public void testOptionalRequiredInteraction() {
+    for (int i = 0; i < 6; i++) {
+      Type current = new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "primitive");
+      for (int j = 0; j < i; j++) {
+        current = new GroupType(Repetition.REQUIRED, "req" + j, current);
+      }
+      MessageType groupSchema = new MessageType("schema"+i, current);
+      GroupFactory gf = new SimpleGroupFactory(groupSchema);
+      List<Group> groups = new ArrayList<Group>();
+      Group root = gf.newGroup();
+      Group currentGroup = root;
+      for (int j = 0; j < i; j++) {
+        currentGroup = currentGroup.addGroup(0);
+      }
+      currentGroup.add(0, Binary.fromString("foo"));
+      groups.add(root);
+      testSchema(groupSchema, groups);
+    }
+    for (int i = 0; i < 6; i++) {
+      Type current = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "primitive");
+      for (int j = 0; j < i; j++) {
+        current = new GroupType(Repetition.REQUIRED, "req" + j, current);
+      }
+      MessageType groupSchema = new MessageType("schema"+(i+6), current);
+      GroupFactory gf = new SimpleGroupFactory(groupSchema);
+      List<Group> groups = new ArrayList<Group>();
+      Group rootDefined = gf.newGroup();
+      Group rootUndefined = gf.newGroup();
+      Group currentDefinedGroup = rootDefined;
+      Group currentUndefinedGroup = rootUndefined;
+      for (int j = 0; j < i; j++) {
+        currentDefinedGroup = currentDefinedGroup.addGroup(0);
+        currentUndefinedGroup = currentUndefinedGroup.addGroup(0);
+      }
+      currentDefinedGroup.add(0, Binary.fromString("foo"));
+      groups.add(rootDefined);
+      groups.add(rootUndefined);
+      testSchema(groupSchema, groups);
+    }
+    for (int i = 0; i < 6; i++) {
+      Type current = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "primitive");
+      for (int j = 0; j < 6; j++) {
+        current = new GroupType(i==j ? Repetition.OPTIONAL : Repetition.REQUIRED, "req" + j, current);
+      }
+      MessageType groupSchema = new MessageType("schema"+(i+12), current);
+      GroupFactory gf = new SimpleGroupFactory(groupSchema);
+      List<Group> groups = new ArrayList<Group>();
+      Group rootDefined = gf.newGroup();
+      Group rootUndefined = gf.newGroup();
+      Group currentDefinedGroup = rootDefined;
+      Group currentUndefinedGroup = rootUndefined;
+      for (int j = 0; j < 6; j++) {
+        currentDefinedGroup = currentDefinedGroup.addGroup(0);
+        if (i < j) {
+          currentUndefinedGroup = currentUndefinedGroup.addGroup(0);
+        }
+      }
+      currentDefinedGroup.add(0, Binary.fromString("foo"));
+      groups.add(rootDefined);
+      groups.add(rootUndefined);
+      testSchema(groupSchema, groups);
+    }
+  }
+
+  private void testSchema(MessageType messageSchema, List<Group> groups) {
+    MemPageStore memPageStore = new MemPageStore(groups.size());
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
+
+    ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
+    MessageColumnIO columnIO = columnIOFactory.getColumnIO(messageSchema);
+    log(columnIO);
+
+    // Write groups.
+    GroupWriter groupWriter =
+        new GroupWriter(columnIO.getRecordWriter(columns), messageSchema);
+    for (Group group : groups) {
+      groupWriter.write(group);
+    }
+    columns.flush();
+
+    // Read groups and verify.
+    RecordReaderImplementation<Group> recordReader =
+        getRecordReader(columnIO, messageSchema, memPageStore);
+    for (Group group : groups) {
+      final Group got = recordReader.read();
+      assertEquals("deserialization does not display the same result",
+                   group.toString(), got.toString());
+    }
+  }
+
+  private RecordReaderImplementation<Group> getRecordReader(MessageColumnIO columnIO, MessageType schema, PageReadStore pageReadStore) {
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+
+    return (RecordReaderImplementation<Group>)columnIO.getRecordReader(pageReadStore, recordConverter);
+  }
+
+  private void log(Object o) {
+    LOG.info(o);
+  }
+
+  private void validateFSA(int[][] expectedFSA, MessageColumnIO columnIO, RecordReaderImplementation<?> recordReader) {
+    log("FSA: ----");
+    List<PrimitiveColumnIO> leaves = columnIO.getLeaves();
+    for (int i = 0; i < leaves.size(); ++i) {
+      PrimitiveColumnIO primitiveColumnIO = leaves.get(i);
+      log(Arrays.toString(primitiveColumnIO.getFieldPath()));
+      for (int r = 0; r < expectedFSA[i].length; r++) {
+        int next = expectedFSA[i][r];
+        log(" "+r+" -> "+ (next==leaves.size() ? "end" : Arrays.toString(leaves.get(next).getFieldPath()))+": "+recordReader.getNextLevel(i, r));
+        assertEquals(Arrays.toString(primitiveColumnIO.getFieldPath())+": "+r+" -> ", next, recordReader.getNextReader(i, r));
+      }
+    }
+    log("----");
+  }
+
+  @Test
+  public void testPushParser() {
+    MemPageStore memPageStore = new MemPageStore(1);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
+    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+    new GroupWriter(columnIO.getRecordWriter(columns), schema).write(r1);
+    columns.flush();
+
+    RecordReader<Void> recordReader = columnIO.getRecordReader(memPageStore, new ExpectationValidatingConverter(expectedEventsForR1, schema));
+    recordReader.read();
+
+  }
+
+  private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
+    return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testEmptyField() {
+    MemPageStore memPageStore = new MemPageStore(1);
+    ColumnWriteStoreV1 columns = newColumnWriteStore(memPageStore);
+    MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
+    final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
+    recordWriter.startMessage();
+    recordWriter.startField("DocId", 0);
+    recordWriter.addLong(0);
+    recordWriter.endField("DocId", 0);
+    recordWriter.startField("Links", 1);
+    try {
+      recordWriter.endField("Links", 1);
+      Assert.fail("expected exception because of empty field");
+    } catch (ParquetEncodingException e) {
+      Assert.assertEquals("empty fields are illegal, the field should be ommited completely instead", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testGroupWriter() {
+    List<Group> result = new ArrayList<Group>();
+    final GroupRecordConverter groupRecordConverter = new GroupRecordConverter(schema);
+    RecordConsumer groupConsumer = new ConverterConsumer(groupRecordConverter.getRootConverter(), schema);
+    GroupWriter groupWriter = new GroupWriter(new RecordConsumerLoggingWrapper(groupConsumer), schema);
+    groupWriter.write(r1);
+    result.add(groupRecordConverter.getCurrentRecord());
+    groupWriter.write(r2);
+    result.add(groupRecordConverter.getCurrentRecord());
+    assertEquals("deserialization does not display the expected result", result.get(0).toString(), r1.toString());
+    assertEquals("deserialization does not display the expected result", result.get(1).toString(), r2.toString());
+  }
+
+  @Test
+  public void testWriteWithGroupWriter() {
+
+    final String[] expected = {
+        "[DocId]: 10, r:0, d:0",
+        "[Links, Forward]: 20, r:0, d:2",
+        "[Links, Forward]: 40, r:1, d:2",
+        "[Links, Forward]: 60, r:1, d:2",
+        "[Links, Backward]: null, r:0, d:1",
+        "[Name, Language, Code]: en-us, r:0, d:2",
+        "[Name, Language, Country]: us, r:0, d:3",
+        "[Name, Language, Code]: en, r:2, d:2",
+        "[Name, Language, Country]: null, r:2, d:2",
+        "[Name, Url]: http://A, r:0, d:2",
+        "[Name, Url]: http://B, r:1, d:2",
+        "[Name, Language, Code]: null, r:1, d:1",
+        "[Name, Language, Country]: null, r:1, d:1",
+        "[Name, Language, Code]: en-gb, r:1, d:2",
+        "[Name, Language, Country]: gb, r:1, d:3",
+        "[Name, Url]: null, r:1, d:1",
+        "[DocId]: 20, r:0, d:0",
+        "[Links, Backward]: 10, r:0, d:2",
+        "[Links, Backward]: 30, r:1, d:2",
+        "[Links, Forward]: 80, r:0, d:2",
+        "[Name, Url]: http://C, r:0, d:2",
+        "[Name, Language, Code]: null, r:0, d:1",
+        "[Name, Language, Country]: null, r:0, d:1"
+    };
+
+    ValidatingColumnWriteStore columns = new ValidatingColumnWriteStore(expected);
+    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+    groupWriter.write(r1);
+    groupWriter.write(r2);
+    columns.validate();
+  }
+}
+final class ValidatingColumnWriteStore implements ColumnWriteStore {
+  private final String[] expected;
+  int counter = 0;
+
+  ValidatingColumnWriteStore(String[] expected) {
+    this.expected = expected;
+  }
+
+  @Override
+  public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
+    return new ColumnWriter() {
+      private void validate(Object value, int repetitionLevel,
+          int definitionLevel) {
+        String actual = Arrays.toString(path.getPath())+": "+value+", r:"+repetitionLevel+", d:"+definitionLevel;
+        assertEquals("event #" + counter, expected[counter], actual);
+        ++ counter;
+      }
+
+      @Override
+      public void writeNull(int repetitionLevel, int definitionLevel) {
+        validate(null, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(Binary value, int repetitionLevel, int definitionLevel) {
+        validate(value.toStringUsingUTF8(), repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(boolean value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(int value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(long value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(float value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
+      public void write(double value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+    };
+  }
+
+  public void validate() {
+    assertEquals("read all events", expected.length, counter);
+  }
+
+  @Override
+  public void endRecord() {
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public long getAllocatedSize() {
+    return 0;
+  }
+
+  @Override
+  public long getBufferedSize() {
+    return 0;
+  }
+
+  @Override
+  public String memUsageString() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
new file mode 100644
index 0000000..479b138
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
@@ -0,0 +1,270 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.page.mem.MemPageStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupWriter;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.filter.ColumnPredicates.LongPredicateFunction;
+import org.apache.parquet.filter.ColumnPredicates.PredicateFunction;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.example.Paper.r1;
+import static org.apache.parquet.example.Paper.r2;
+import static org.apache.parquet.example.Paper.schema;
+import static org.apache.parquet.filter.AndRecordFilter.and;
+import static org.apache.parquet.filter.ColumnPredicates.applyFunctionToLong;
+import static org.apache.parquet.filter.ColumnPredicates.applyFunctionToString;
+import static org.apache.parquet.filter.ColumnPredicates.equalTo;
+import static org.apache.parquet.filter.ColumnRecordFilter.column;
+import static org.apache.parquet.filter.NotRecordFilter.not;
+import static org.apache.parquet.filter.OrRecordFilter.or;
+import static org.apache.parquet.filter.PagedRecordFilter.page;
+
+public class TestFiltered {
+
+  /* Class that implements applyFunction filter for long. Checks for long greater than 15. */
+  public class LongGreaterThan15Predicate implements LongPredicateFunction {
+    @Override
+    public boolean functionToApply(long input) {
+      return input > 15;
+    }
+  };
+
+  /* Class that implements applyFunction filter for string. Checks for string ending in 'A'. */
+  public class StringEndsWithAPredicate implements PredicateFunction<String> {
+    @Override
+    public boolean functionToApply(String input) {
+      return input.endsWith("A");
+    }
+  };
+
+  private List<Group> readAll(RecordReader<Group> reader) {
+    List<Group> result = new ArrayList<Group>();
+    Group g;
+    while ((g = reader.read()) != null) {
+      result.add(g);
+    }
+    return result;
+  }
+
+  private void readOne(RecordReader<Group> reader, String message, Group expected) {
+    List<Group> result = readAll(reader);
+    assertEquals(message + ": " + result, 1, result.size());
+    assertEquals("filtering did not return the correct record", expected.toString(), result.get(0).toString());
+  }
+
+  @Test
+  public void testFilterOnInteger() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 1);
+
+    // Get first record
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter, FilterCompat.get(column("DocId", equalTo(10l))));
+
+    readOne(recordReader, "r2 filtered out", r1);
+
+    // Get second record
+    recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("DocId", equalTo(20l))));
+
+    readOne(recordReader, "r1 filtered out", r2);
+
+  }
+
+  @Test
+  public void testApplyFunctionFilterOnLong() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 1);
+
+    // Get first record
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("DocId", equalTo(10l))));
+
+    readOne(recordReader, "r2 filtered out", r1);
+
+    // Get second record
+    recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("DocId", applyFunctionToLong(new LongGreaterThan15Predicate()))));
+
+    readOne(recordReader, "r1 filtered out", r2);
+  }
+
+  @Test
+  public void testFilterOnString() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 1);
+
+    // First try matching against the A url in record 1
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("Name.Url", equalTo("http://A"))));
+
+    readOne(recordReader, "r2 filtered out", r1);
+
+    // Second try matching against the B url in record 1 - it should fail as we only match
+    // against the first instance of a
+    recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("Name.Url", equalTo("http://B"))));
+
+    List<Group> all = readAll(recordReader);
+    assertEquals("There should be no matching records: " + all , 0, all.size());
+
+    // Finally try matching against the C url in record 2
+    recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("Name.Url", equalTo("http://C"))));
+
+    readOne(recordReader, "r1 filtered out", r2);
+
+  }
+
+  @Test
+  public void testApplyFunctionFilterOnString() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 1);
+
+    // First try matching against the A url in record 1
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("Name.Url", applyFunctionToString(new StringEndsWithAPredicate()))));
+
+    readOne(recordReader, "r2 filtered out", r1);
+
+    // Second try matching against the B url in record 1 - it should fail as we only match
+    // against the first instance of a
+    recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("Name.Url", equalTo("http://B"))));
+
+    List<Group> all = readAll(recordReader);
+    assertEquals("There should be no matching records: " + all , 0, all.size());
+
+    // Finally try matching against the C url in record 2
+    recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(column("Name.Url", equalTo("http://C"))));
+
+    readOne(recordReader, "r1 filtered out", r2);
+
+  }
+
+  @Test
+  public void testPaged() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 6);
+
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(page(4, 4)));
+
+    List<Group> all = readAll(recordReader);
+    assertEquals("expecting records " + all, 4, all.size());
+    for (int i = 0; i < all.size(); i++) {
+      assertEquals("expecting record", (i%2 == 0 ? r2 : r1).toString(), all.get(i).toString());
+    }
+  }
+
+  @Test
+  public void testFilteredAndPaged() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 8);
+
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(and(column("DocId", equalTo(10l)), page(2, 4))));
+
+    List<Group> all = readAll(recordReader);
+    assertEquals("expecting 4 records " + all, 4, all.size());
+    for (int i = 0; i < all.size(); i++) {
+      assertEquals("expecting record1", r1.toString(), all.get(i).toString());
+    }
+
+  }
+
+  @Test
+  public void testFilteredOrPaged() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 8);
+
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(or(column("DocId", equalTo(10l)),
+                column("DocId", equalTo(20l)))));
+
+    List<Group> all = readAll(recordReader);
+    assertEquals("expecting 8 records " + all, 16, all.size());
+    for (int i = 0; i < all.size () / 2; i++) {
+      assertEquals("expecting record1", r1.toString(), all.get(2 * i).toString());
+      assertEquals("expecting record2", r2.toString(), all.get(2 * i + 1).toString());
+    }
+  }
+
+  @Test
+  public void testFilteredNotPaged() {
+    MessageColumnIO columnIO =  new ColumnIOFactory(true).getColumnIO(schema);
+    MemPageStore memPageStore = writeTestRecords(columnIO, 8);
+
+    RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
+    RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
+        columnIO.getRecordReader(memPageStore, recordConverter,
+            FilterCompat.get(not(column("DocId", equalTo(10l)))));
+
+    List<Group> all = readAll(recordReader);
+    assertEquals("expecting 8 records " + all, 8, all.size());
+    for (int i = 0; i < all.size(); i++) {
+      assertEquals("expecting record2", r2.toString(), all.get(i).toString());
+    }
+  }
+
+  private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
+    MemPageStore memPageStore = new MemPageStore(number * 2);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0);
+
+    GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
+    for ( int i = 0; i < number; i++ ) {
+      groupWriter.write(r1);
+      groupWriter.write(r2);
+    }
+    columns.flush();
+    return memPageStore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java b/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
new file mode 100644
index 0000000..dc9369b
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
@@ -0,0 +1,313 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.parser;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.Types.buildMessage;
+
+import org.junit.Test;
+
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
+import org.apache.parquet.schema.Types.MessageTypeBuilder;
+
+public class TestParquetParser {
+  @Test
+  public void testPaperExample() throws Exception {
+    String example =
+        "message Document {\n" +
+        "  required int64 DocId;\n" +
+        "  optional group Links {\n" +
+        "    repeated int64 Backward;\n" +
+        "    repeated int64 Forward; }\n" +
+        "  repeated group Name {\n" +
+        "    repeated group Language {\n" +
+        "      required binary Code;\n" +
+        "      required binary Country; }\n" +
+        "    optional binary Url; }}";
+    MessageType parsed = parseMessageType(example);
+    MessageType manuallyMade =
+        new MessageType("Document",
+            new PrimitiveType(REQUIRED, INT64, "DocId"),
+            new GroupType(OPTIONAL, "Links",
+                new PrimitiveType(REPEATED, INT64, "Backward"),
+                new PrimitiveType(REPEATED, INT64, "Forward")),
+                new GroupType(REPEATED, "Name",
+                    new GroupType(REPEATED, "Language",
+                        new PrimitiveType(REQUIRED, BINARY, "Code"),
+                        new PrimitiveType(REQUIRED, BINARY, "Country")),
+                        new PrimitiveType(OPTIONAL, BINARY, "Url")));
+    assertEquals(manuallyMade, parsed);
+
+    MessageType parsedThenReparsed = parseMessageType(parsed.toString());
+
+    assertEquals(manuallyMade, parsedThenReparsed);
+
+    parsed = parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}");
+    manuallyMade =
+        new MessageType("m",
+            new GroupType(REQUIRED, "a",
+                new PrimitiveType(REQUIRED, BINARY, "b")),
+                new GroupType(REQUIRED, "c",
+                    new PrimitiveType(REQUIRED, INT64, "d")));
+
+    assertEquals(manuallyMade, parsed);
+
+    parsedThenReparsed = parseMessageType(parsed.toString());
+
+    assertEquals(manuallyMade, parsedThenReparsed);
+  }
+
+  @Test
+  public void testEachPrimitiveType() {
+    MessageTypeBuilder builder = buildMessage();
+    StringBuilder schema = new StringBuilder();
+    schema.append("message EachType {\n");
+    for (PrimitiveTypeName type : PrimitiveTypeName.values()) {
+      // add a schema entry, e.g., "  required int32 int32_;\n"
+      if (type == FIXED_LEN_BYTE_ARRAY) {
+        schema.append("  required fixed_len_byte_array(3) fixed_;");
+        builder.required(FIXED_LEN_BYTE_ARRAY).length(3).named("fixed_");
+      } else {
+        schema.append("  required ").append(type)
+        .append(" ").append(type).append("_;\n");
+        builder.required(type).named(type.toString() + "_");
+      }
+    }
+    schema.append("}\n");
+    MessageType expected = builder.named("EachType");
+
+    MessageType parsed = parseMessageType(schema.toString());
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testUTF8Annotation() {
+    String message =
+        "message StringMessage {\n" +
+        "  required binary string (UTF8);\n" +
+        "}\n";
+
+    MessageType parsed = parseMessageType(message);
+    MessageType expected = buildMessage()
+        .required(BINARY).as(UTF8).named("string")
+        .named("StringMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testIDs() {
+    String message =
+        "message Message {\n" +
+        "  required binary string (UTF8) = 6;\n" +
+        "  required int32 i=1;\n" +
+        "  required binary s2= 3;\n" +
+        "  required binary s3 =4;\n" +
+        "}\n";
+
+    MessageType parsed = parseMessageType(message);
+    MessageType expected = buildMessage()
+        .required(BINARY).as(OriginalType.UTF8).id(6).named("string")
+        .required(INT32).id(1).named("i")
+        .required(BINARY).id(3).named("s2")
+        .required(BINARY).id(4).named("s3")
+        .named("Message");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testMAPAnnotations() {
+    // this is primarily to test group annotations
+    String message =
+        "message Message {\n" +
+        "  optional group aMap (MAP) {\n" +
+        "    repeated group map (MAP_KEY_VALUE) {\n" +
+        "      required binary key (UTF8);\n" +
+        "      required int32 value;\n" +
+        "    }\n" +
+        "  }\n" +
+        "}\n";
+
+    MessageType parsed = parseMessageType(message);
+    MessageType expected = buildMessage()
+        .optionalGroup()
+        .repeatedGroup()
+        .required(BINARY).as(UTF8).named("key")
+        .required(INT32).named("value")
+        .named("map")
+        .named("aMap")
+        .named("Message");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testLISTAnnotation() {
+    // this is primarily to test group annotations
+    String message =
+        "message Message {\n" +
+        "  required group aList (LIST) {\n" +
+        "    repeated binary string (UTF8);\n" +
+        "  }\n" +
+        "}\n";
+
+    MessageType parsed = parseMessageType(message);
+    MessageType expected = buildMessage()
+        .requiredGroup()
+        .repeated(BINARY).as(UTF8).named("string")
+        .named("aList")
+        .named("Message");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testDecimalFixedAnnotation() {
+    String message =
+        "message DecimalMessage {\n" +
+        "  required FIXED_LEN_BYTE_ARRAY(4) aDecimal (DECIMAL(9,2));\n" +
+        "}\n";
+
+    MessageType parsed = parseMessageType(message);
+    MessageType expected = buildMessage()
+        .required(FIXED_LEN_BYTE_ARRAY).length(4)
+        .as(DECIMAL).precision(9).scale(2)
+        .named("aDecimal")
+        .named("DecimalMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testDecimalBinaryAnnotation() {
+    String message =
+        "message DecimalMessage {\n" +
+        "  required binary aDecimal (DECIMAL(9,2));\n" +
+        "}\n";
+
+    MessageType parsed = parseMessageType(message);
+    MessageType expected = buildMessage()
+        .required(BINARY).as(DECIMAL).precision(9).scale(2)
+        .named("aDecimal")
+        .named("DecimalMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testTimeAnnotations() {
+    String message = "message TimeMessage {" +
+        "  required int32 date (DATE);" +
+        "  required int32 time (TIME_MILLIS);" +
+        "  required int64 timestamp (TIMESTAMP_MILLIS);" +
+        "  required FIXED_LEN_BYTE_ARRAY(12) interval (INTERVAL);" +
+        "}\n";
+
+    MessageType parsed = MessageTypeParser.parseMessageType(message);
+    MessageType expected = Types.buildMessage()
+        .required(INT32).as(DATE).named("date")
+        .required(INT32).as(TIME_MILLIS).named("time")
+        .required(INT64).as(TIMESTAMP_MILLIS).named("timestamp")
+        .required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("interval")
+        .named("TimeMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = MessageTypeParser.parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testIntAnnotations() {
+    String message = "message IntMessage {" +
+        "  required int32 i8 (INT_8);" +
+        "  required int32 i16 (INT_16);" +
+        "  required int32 i32 (INT_32);" +
+        "  required int64 i64 (INT_64);" +
+        "  required int32 u8 (UINT_8);" +
+        "  required int32 u16 (UINT_16);" +
+        "  required int32 u32 (UINT_32);" +
+        "  required int64 u64 (UINT_64);" +
+        "}\n";
+
+    MessageType parsed = MessageTypeParser.parseMessageType(message);
+    MessageType expected = Types.buildMessage()
+        .required(INT32).as(INT_8).named("i8")
+        .required(INT32).as(INT_16).named("i16")
+        .required(INT32).as(INT_32).named("i32")
+        .required(INT64).as(INT_64).named("i64")
+        .required(INT32).as(UINT_8).named("u8")
+        .required(INT32).as(UINT_16).named("u16")
+        .required(INT32).as(UINT_32).named("u32")
+        .required(INT64).as(UINT_64).named("u64")
+        .named("IntMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = MessageTypeParser.parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
+  public void testEmbeddedAnnotations() {
+    String message = "message EmbeddedMessage {" +
+        "  required binary json (JSON);" +
+        "  required binary bson (BSON);" +
+        "}\n";
+
+    MessageType parsed = MessageTypeParser.parseMessageType(message);
+    MessageType expected = Types.buildMessage()
+        .required(BINARY).as(JSON).named("json")
+        .required(BINARY).as(BSON).named("bson")
+        .named("EmbeddedMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = MessageTypeParser.parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
new file mode 100644
index 0000000..5bc3122
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
@@ -0,0 +1,148 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+import org.junit.Test;
+
+import org.apache.parquet.example.Paper;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+
+public class TestMessageType {
+  @Test
+  public void test() throws Exception {
+    MessageType schema = MessageTypeParser.parseMessageType(Paper.schema.toString());
+    assertEquals(Paper.schema, schema);
+    assertEquals(schema.toString(), Paper.schema.toString());
+  }
+
+  @Test
+  public void testNestedTypes() {
+    MessageType schema = MessageTypeParser.parseMessageType(Paper.schema.toString());
+    Type type = schema.getType("Links", "Backward");
+    assertEquals(PrimitiveTypeName.INT64,
+        type.asPrimitiveType().getPrimitiveTypeName());
+    assertEquals(0, schema.getMaxRepetitionLevel("DocId"));
+    assertEquals(1, schema.getMaxRepetitionLevel("Name"));
+    assertEquals(2, schema.getMaxRepetitionLevel("Name", "Language"));
+    assertEquals(0, schema.getMaxDefinitionLevel("DocId"));
+    assertEquals(1, schema.getMaxDefinitionLevel("Links"));
+    assertEquals(2, schema.getMaxDefinitionLevel("Links", "Backward"));
+  }
+
+
+  @Test
+  public void testMergeSchema() {
+    MessageType t1 = new MessageType("root1",
+        new PrimitiveType(REPEATED, BINARY, "a"),
+        new PrimitiveType(OPTIONAL, BINARY, "b"));
+    MessageType t2 = new MessageType("root2",
+        new PrimitiveType(REQUIRED, BINARY, "c"));
+
+    assertEquals(
+        t1.union(t2),
+        new MessageType("root1",
+            new PrimitiveType(REPEATED, BINARY, "a"),
+            new PrimitiveType(OPTIONAL, BINARY, "b"),
+            new PrimitiveType(REQUIRED, BINARY, "c"))
+        );
+
+    assertEquals(
+        t2.union(t1),
+        new MessageType("root2",
+            new PrimitiveType(REQUIRED, BINARY, "c"),
+            new PrimitiveType(REPEATED, BINARY, "a"),
+            new PrimitiveType(OPTIONAL, BINARY, "b"))
+        );
+
+    MessageType t3 = new MessageType("root1",
+        new PrimitiveType(OPTIONAL, BINARY, "a"));
+    MessageType t4 = new MessageType("root2",
+        new PrimitiveType(REQUIRED, BINARY, "a"));
+
+    try {
+      t3.union(t4);
+      fail("moving from optional to required");
+    } catch (IncompatibleSchemaModificationException e) {
+      assertEquals("repetition constraint is more restrictive: can not merge type required binary a into optional binary a", e.getMessage());
+    }
+
+    assertEquals(
+        t4.union(t3),
+        new MessageType("root2",
+            new PrimitiveType(OPTIONAL, BINARY, "a"))
+        );
+
+    MessageType t5 = new MessageType("root1",
+        new GroupType(REQUIRED, "g1",
+            new PrimitiveType(OPTIONAL, BINARY, "a")),
+        new GroupType(REQUIRED, "g2",
+            new PrimitiveType(OPTIONAL, BINARY, "b")));
+    MessageType t6 = new MessageType("root1",
+        new GroupType(REQUIRED, "g1",
+            new PrimitiveType(OPTIONAL, BINARY, "a")),
+        new GroupType(REQUIRED, "g2",
+              new GroupType(REQUIRED, "g3",
+                  new PrimitiveType(OPTIONAL, BINARY, "c")),
+              new PrimitiveType(OPTIONAL, BINARY, "b")));
+
+    assertEquals(
+        t5.union(t6),
+        new MessageType("root1",
+            new GroupType(REQUIRED, "g1",
+                new PrimitiveType(OPTIONAL, BINARY, "a")),
+            new GroupType(REQUIRED, "g2",
+                new PrimitiveType(OPTIONAL, BINARY, "b"),
+                new GroupType(REQUIRED, "g3",
+                    new PrimitiveType(OPTIONAL, BINARY, "c"))))
+        );
+
+    MessageType t7 = new MessageType("root1",
+        new PrimitiveType(OPTIONAL, BINARY, "a"));
+    MessageType t8 = new MessageType("root2",
+        new PrimitiveType(OPTIONAL, INT32, "a"));
+    try {
+      t7.union(t8);
+      fail("moving from BINARY to INT32");
+    } catch (IncompatibleSchemaModificationException e) {
+      assertEquals("can not merge type optional int32 a into optional binary a", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testIDs() throws Exception {
+    MessageType schema = new MessageType("test",
+        new PrimitiveType(REQUIRED, BINARY, "foo").withId(4),
+        new GroupType(REQUIRED, "bar",
+            new PrimitiveType(REQUIRED, BINARY, "baz").withId(3)
+            ).withId(8)
+        );
+    MessageType schema2 = MessageTypeParser.parseMessageType(schema.toString());
+    assertEquals(schema, schema2);
+    assertEquals(schema.toString(), schema2.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
new file mode 100644
index 0000000..bdf7429
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
@@ -0,0 +1,619 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+import static org.apache.parquet.schema.Type.Repetition.*;
+
+public class TestTypeBuilders {
+  @Test
+  public void testPaperExample() {
+    MessageType expected =
+        new MessageType("Document",
+            new PrimitiveType(REQUIRED, INT64, "DocId"),
+            new GroupType(OPTIONAL, "Links",
+                new PrimitiveType(REPEATED, INT64, "Backward"),
+                new PrimitiveType(REPEATED, INT64, "Forward")),
+            new GroupType(REPEATED, "Name",
+                new GroupType(REPEATED, "Language",
+                    new PrimitiveType(REQUIRED, BINARY, "Code"),
+                    new PrimitiveType(REQUIRED, BINARY, "Country")),
+                new PrimitiveType(OPTIONAL, BINARY, "Url")));
+    MessageType builderType = Types.buildMessage()
+        .required(INT64).named("DocId")
+        .optionalGroup()
+            .repeated(INT64).named("Backward")
+            .repeated(INT64).named("Forward")
+            .named("Links")
+        .repeatedGroup()
+            .repeatedGroup()
+                .required(BINARY).named("Code")
+                .required(BINARY).named("Country")
+            .named("Language")
+            .optional(BINARY).named("Url")
+            .named("Name")
+        .named("Document");
+    Assert.assertEquals(expected, builderType);
+  }
+
+  @Test
+  public void testGroupTypeConstruction() {
+    PrimitiveType f1 = Types.required(BINARY).as(UTF8).named("f1");
+    PrimitiveType f2 = Types.required(INT32).named("f2");
+    PrimitiveType f3 = Types.optional(INT32).named("f3");
+    String name = "group";
+    for (Type.Repetition repetition : Type.Repetition.values()) {
+      GroupType expected = new GroupType(repetition, name,
+          f1,
+          new GroupType(repetition, "g1", f2, f3));
+      GroupType built = Types.buildGroup(repetition)
+          .addField(f1)
+          .group(repetition).addFields(f2, f3).named("g1")
+          .named(name);
+      Assert.assertEquals(expected, built);
+
+      switch (repetition) {
+        case REQUIRED:
+          built = Types.requiredGroup()
+              .addField(f1)
+              .requiredGroup().addFields(f2, f3).named("g1")
+              .named(name);
+          break;
+        case OPTIONAL:
+          built = Types.optionalGroup()
+              .addField(f1)
+              .optionalGroup().addFields(f2, f3).named("g1")
+              .named(name);
+          break;
+        case REPEATED:
+          built = Types.repeatedGroup()
+              .addField(f1)
+              .repeatedGroup().addFields(f2, f3).named("g1")
+              .named(name);
+          break;
+      }
+      Assert.assertEquals(expected, built);
+    }
+  }
+
+  @Test
+  public void testPrimitiveTypeConstruction() {
+    PrimitiveTypeName[] types = new PrimitiveTypeName[] {
+        BOOLEAN, INT32, INT64, INT96, FLOAT, DOUBLE, BINARY
+    };
+    for (PrimitiveTypeName type : types) {
+      String name = type.toString() + "_";
+      for (Type.Repetition repetition : Type.Repetition.values()) {
+        PrimitiveType expected = new PrimitiveType(repetition, type, name);
+        PrimitiveType built = Types.primitive(type, repetition).named(name);
+        Assert.assertEquals(expected, built);
+        switch (repetition) {
+          case REQUIRED:
+            built = Types.required(type).named(name);
+            break;
+          case OPTIONAL:
+            built = Types.optional(type).named(name);
+            break;
+          case REPEATED:
+            built = Types.repeated(type).named(name);
+            break;
+        }
+        Assert.assertEquals(expected, built);
+      }
+    }
+  }
+
+  @Test
+  public void testFixedTypeConstruction() {
+    String name = "fixed_";
+    int len = 5;
+    for (Type.Repetition repetition : Type.Repetition.values()) {
+      PrimitiveType expected = new PrimitiveType(
+          repetition, FIXED_LEN_BYTE_ARRAY, len, name);
+      PrimitiveType built = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .length(len).named(name);
+      Assert.assertEquals(expected, built);
+      switch (repetition) {
+        case REQUIRED:
+          built = Types.required(FIXED_LEN_BYTE_ARRAY).length(len).named(name);
+          break;
+        case OPTIONAL:
+          built = Types.optional(FIXED_LEN_BYTE_ARRAY).length(len).named(name);
+          break;
+        case REPEATED:
+          built = Types.repeated(FIXED_LEN_BYTE_ARRAY).length(len).named(name);
+          break;
+      }
+      Assert.assertEquals(expected, built);
+    }
+  }
+
+  @Test
+  public void testEmptyGroup() {
+    assertThrows("Should complain that required group is empty",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.requiredGroup().named("g");
+          }
+        });
+    assertThrows("Should complain that optional group is empty",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.optionalGroup().named("g");
+          }
+        });
+    assertThrows("Should complain that repeated group is empty",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.repeatedGroup().named("g");
+          }
+        });
+  }
+
+  @Test
+  @Ignore(value="Enforcing this breaks tests in parquet-thrift")
+  public void testEmptyMessage() {
+    assertThrows("Should complain that message is empty",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage().named("m");
+          }
+        });
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testFixedWithoutLength() {
+    Types.required(FIXED_LEN_BYTE_ARRAY).named("fixed");
+  }
+
+  @Test
+  public void testFixedWithLength() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, FIXED_LEN_BYTE_ARRAY, 7, "fixed");
+    PrimitiveType fixed = Types.required(FIXED_LEN_BYTE_ARRAY).length(7).named("fixed");
+    Assert.assertEquals(expected, fixed);
+  }
+
+  @Test
+  public void testFixedLengthEquals() {
+    Type f4 = Types.required(FIXED_LEN_BYTE_ARRAY).length(4).named("f4");
+    Type f8 = Types.required(FIXED_LEN_BYTE_ARRAY).length(8).named("f8");
+    Assert.assertFalse("Types with different lengths should not be equal",
+        f4.equals(f8));
+  }
+
+  @Test
+  public void testDecimalAnnotation() {
+    // int32 primitive type
+    MessageType expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, INT32, 0, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 2), null));
+    MessageType builderType = Types.buildMessage()
+        .required(INT32)
+            .as(DECIMAL).precision(9).scale(2)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+    // int64 primitive type
+    expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, INT64, 0, "aDecimal",
+            DECIMAL, new DecimalMetadata(18, 2), null));
+    builderType = Types.buildMessage()
+        .required(INT64)
+            .as(DECIMAL).precision(18).scale(2)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+    // binary primitive type
+    expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, BINARY, 0, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 2), null));
+    builderType = Types.buildMessage()
+        .required(BINARY).as(DECIMAL).precision(9).scale(2)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+    // fixed primitive type
+    expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, FIXED_LEN_BYTE_ARRAY, 4, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 2), null));
+    builderType = Types.buildMessage()
+        .required(FIXED_LEN_BYTE_ARRAY).length(4)
+            .as(DECIMAL).precision(9).scale(2)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+  }
+
+  @Test
+  public void testDecimalAnnotationMissingScale() {
+    MessageType expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, INT32, 0, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 0), null));
+    MessageType builderType = Types.buildMessage()
+        .required(INT32)
+            .as(DECIMAL).precision(9)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+
+    expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, INT64, 0, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 0), null));
+    builderType = Types.buildMessage()
+        .required(INT64)
+            .as(DECIMAL).precision(9)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+
+    expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, BINARY, 0, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 0), null));
+    builderType = Types.buildMessage()
+        .required(BINARY).as(DECIMAL).precision(9)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+
+    expected = new MessageType("DecimalMessage",
+        new PrimitiveType(REQUIRED, FIXED_LEN_BYTE_ARRAY, 7, "aDecimal",
+            DECIMAL, new DecimalMetadata(9, 0), null));
+    builderType = Types.buildMessage()
+        .required(FIXED_LEN_BYTE_ARRAY).length(7)
+            .as(DECIMAL).precision(9)
+            .named("aDecimal")
+        .named("DecimalMessage");
+    Assert.assertEquals(expected, builderType);
+  }
+
+  @Test
+  public void testDecimalAnnotationMissingPrecision() {
+    assertThrows("Should reject decimal annotation without precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(INT32).as(DECIMAL).scale(2)
+                    .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        });
+    assertThrows("Should reject decimal annotation without precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(INT64).as(DECIMAL).scale(2)
+                    .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        });
+    assertThrows("Should reject decimal annotation without precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(BINARY).as(DECIMAL).scale(2)
+                    .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        });
+    assertThrows("Should reject decimal annotation without precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(FIXED_LEN_BYTE_ARRAY).length(7)
+                .as(DECIMAL).scale(2)
+                .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        }
+    );
+  }
+
+  @Test
+  public void testDecimalAnnotationPrecisionScaleBound() {
+    assertThrows("Should reject scale greater than precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(INT32).as(DECIMAL).precision(3).scale(4)
+                    .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        });
+    assertThrows("Should reject scale greater than precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(INT64).as(DECIMAL).precision(3).scale(4)
+                    .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        });
+    assertThrows("Should reject scale greater than precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(BINARY).as(DECIMAL).precision(3).scale(4)
+                    .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        });
+    assertThrows("Should reject scale greater than precision",
+        IllegalArgumentException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.buildMessage()
+                .required(FIXED_LEN_BYTE_ARRAY).length(7)
+                .as(DECIMAL).precision(3).scale(4)
+                .named("aDecimal")
+                .named("DecimalMessage");
+          }
+        }
+    );
+  }
+
+  @Test
+  public void testDecimalAnnotationLengthCheck() {
+    // maximum precision for 4 bytes is 9
+    assertThrows("should reject precision 10 with length 4",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.required(FIXED_LEN_BYTE_ARRAY).length(4)
+                .as(DECIMAL).precision(10).scale(2)
+                .named("aDecimal");
+          }
+        });
+    assertThrows("should reject precision 10 with length 4",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.required(INT32)
+                .as(DECIMAL).precision(10).scale(2)
+                .named("aDecimal");
+          }
+        });
+    // maximum precision for 8 bytes is 19
+    assertThrows("should reject precision 19 with length 8",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.required(FIXED_LEN_BYTE_ARRAY).length(8)
+                .as(DECIMAL).precision(19).scale(4)
+                .named("aDecimal");
+          }
+        });
+    assertThrows("should reject precision 19 with length 8",
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.required(INT64).length(8)
+                .as(DECIMAL).precision(19).scale(4)
+                .named("aDecimal");
+          }
+        }
+    );
+  }
+
+  @Test
+  public void testDECIMALAnnotationRejectsUnsupportedTypes() {
+    PrimitiveTypeName[] unsupported = new PrimitiveTypeName[]{
+        BOOLEAN, INT96, DOUBLE, FLOAT
+    };
+    for (final PrimitiveTypeName type : unsupported) {
+      assertThrows("Should reject non-binary type: " + type,
+          IllegalStateException.class, new Callable<Type>() {
+            @Override
+            public Type call() throws Exception {
+              return Types.required(type)
+                  .as(DECIMAL).precision(9).scale(2)
+                  .named("d");
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testBinaryAnnotations() {
+    OriginalType[] types = new OriginalType[] {
+        UTF8, JSON, BSON};
+    for (final OriginalType logicalType : types) {
+      PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "col", logicalType);
+      PrimitiveType string = Types.required(BINARY).as(logicalType).named("col");
+      Assert.assertEquals(expected, string);
+    }
+  }
+
+  @Test
+  public void testBinaryAnnotationsRejectsNonBinary() {
+    OriginalType[] types = new OriginalType[] {
+        UTF8, JSON, BSON};
+    for (final OriginalType logicalType : types) {
+      PrimitiveTypeName[] nonBinary = new PrimitiveTypeName[]{
+          BOOLEAN, INT32, INT64, INT96, DOUBLE, FLOAT
+      };
+      for (final PrimitiveTypeName type : nonBinary) {
+        assertThrows("Should reject non-binary type: " + type,
+            IllegalStateException.class, new Callable<Type>() {
+              @Override
+              public Type call() throws Exception {
+                return Types.required(type).as(logicalType).named("col");
+              }
+            });
+      }
+      assertThrows("Should reject non-binary type: FIXED_LEN_BYTE_ARRAY",
+          IllegalStateException.class, new Callable<Type>() {
+            @Override
+            public Type call() throws Exception {
+              return Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
+                  .as(logicalType).named("col");
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testInt32Annotations() {
+    OriginalType[] types = new OriginalType[] {
+        DATE, TIME_MILLIS, UINT_8, UINT_16, UINT_32, INT_8, INT_16, INT_32};
+    for (OriginalType logicalType : types) {
+      PrimitiveType expected = new PrimitiveType(REQUIRED, INT32, "col", logicalType);
+      PrimitiveType date = Types.required(INT32).as(logicalType).named("col");
+      Assert.assertEquals(expected, date);
+    }
+  }
+
+  @Test
+  public void testInt32AnnotationsRejectNonInt32() {
+    OriginalType[] types = new OriginalType[] {
+        DATE, TIME_MILLIS, UINT_8, UINT_16, UINT_32, INT_8, INT_16, INT_32};
+    for (final OriginalType logicalType : types) {
+      PrimitiveTypeName[] nonInt32 = new PrimitiveTypeName[]{
+          BOOLEAN, INT64, INT96, DOUBLE, FLOAT, BINARY
+      };
+      for (final PrimitiveTypeName type : nonInt32) {
+        assertThrows("Should reject non-int32 type: " + type,
+            IllegalStateException.class, new Callable<Type>() {
+              @Override
+              public Type call() throws Exception {
+                return Types.required(type).as(logicalType).named("col");
+              }
+            });
+      }
+      assertThrows("Should reject non-int32 type: FIXED_LEN_BYTE_ARRAY",
+          IllegalStateException.class, new Callable<Type>() {
+            @Override
+            public Type call() throws Exception {
+              return Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
+                  .as(logicalType).named("col");
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testInt64Annotations() {
+    OriginalType[] types = new OriginalType[] {
+        TIMESTAMP_MILLIS, UINT_64, INT_64};
+    for (OriginalType logicalType : types) {
+      PrimitiveType expected = new PrimitiveType(REQUIRED, INT64, "col", logicalType);
+      PrimitiveType date = Types.required(INT64).as(logicalType).named("col");
+      Assert.assertEquals(expected, date);
+    }
+  }
+
+  @Test
+  public void testInt64AnnotationsRejectNonInt64() {
+    OriginalType[] types = new OriginalType[] {
+        TIMESTAMP_MILLIS, UINT_64, INT_64};
+    for (final OriginalType logicalType : types) {
+      PrimitiveTypeName[] nonInt64 = new PrimitiveTypeName[]{
+          BOOLEAN, INT32, INT96, DOUBLE, FLOAT, BINARY
+      };
+      for (final PrimitiveTypeName type : nonInt64) {
+        assertThrows("Should reject non-int64 type: " + type,
+            IllegalStateException.class, new Callable<Type>() {
+              @Override
+              public Type call() throws Exception {
+                return Types.required(type).as(logicalType).named("col");
+              }
+            });
+      }
+      assertThrows("Should reject non-int64 type: FIXED_LEN_BYTE_ARRAY",
+          IllegalStateException.class, new Callable<Type>() {
+            @Override
+            public Type call() throws Exception {
+              return Types.required(FIXED_LEN_BYTE_ARRAY).length(1)
+                  .as(logicalType).named("col");
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testIntervalAnnotation() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, FIXED_LEN_BYTE_ARRAY, 12, "interval", INTERVAL);
+    PrimitiveType string = Types.required(FIXED_LEN_BYTE_ARRAY).length(12)
+        .as(INTERVAL).named("interval");
+    Assert.assertEquals(expected, string);
+  }
+
+  @Test
+  public void testIntervalAnnotationRejectsNonFixed() {
+    PrimitiveTypeName[] nonFixed = new PrimitiveTypeName[]{
+        BOOLEAN, INT32, INT64, INT96, DOUBLE, FLOAT, BINARY
+    };
+    for (final PrimitiveTypeName type : nonFixed) {
+      assertThrows("Should reject non-fixed type: " + type,
+          IllegalStateException.class, new Callable<Type>() {
+            @Override
+            public Type call() throws Exception {
+              return Types.required(type).as(INTERVAL).named("interval");
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testIntervalAnnotationRejectsNonFixed12() {
+    assertThrows("Should reject fixed with length != 12: " + 11,
+        IllegalStateException.class, new Callable<Type>() {
+          @Override
+          public Type call() throws Exception {
+            return Types.required(FIXED_LEN_BYTE_ARRAY).length(11)
+                .as(INTERVAL).named("interval");
+          }
+        });
+  }
+
+
+  /**
+   * A convenience method to avoid a large number of @Test(expected=...) tests
+   * @param message A String message to describe this assertion
+   * @param expected An Exception class that the Runnable should throw
+   * @param callable A Callable that is expected to throw the exception
+   */
+  public static void assertThrows(
+      String message, Class<? extends Exception> expected, Callable callable) {
+    try {
+      callable.call();
+      Assert.fail("No exception was thrown (" + message + "), expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      Assert.assertEquals(message, expected, actual.getClass());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
deleted file mode 100644
index 1cbc21d..0000000
--- a/parquet-column/src/test/java/parquet/column/impl/TestColumnReaderImpl.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.column.impl;
-
-import static junit.framework.Assert.assertEquals;
-import static parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
-
-import java.util.List;
-
-import org.junit.Test;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnReader;
-import parquet.column.ParquetProperties;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.mem.MemPageReader;
-import parquet.column.page.mem.MemPageWriter;
-import parquet.io.api.Binary;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-public class TestColumnReaderImpl {
-
-  private int rows = 13001;
-
-  private static final class ValidatingConverter extends PrimitiveConverter {
-    int count;
-
-    @Override
-    public void addBinary(Binary value) {
-      assertEquals("bar" + count % 10, value.toStringUsingUTF8());
-      ++ count;
-    }
-  }
-
-  @Test
-  public void test() {
-    MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
-    ColumnDescriptor col = schema.getColumns().get(0);
-    MemPageWriter pageWriter = new MemPageWriter();
-    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
-    for (int i = 0; i < rows; i++) {
-      columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
-      if ((i + 1) % 1000 == 0) {
-        columnWriterV2.writePage(i);
-      }
-    }
-    columnWriterV2.writePage(rows);
-    columnWriterV2.finalizeColumnChunk();
-    List<DataPage> pages = pageWriter.getPages();
-    int valueCount = 0;
-    int rowCount = 0;
-    for (DataPage dataPage : pages) {
-      valueCount += dataPage.getValueCount();
-      rowCount += ((DataPageV2)dataPage).getRowCount();
-    }
-    assertEquals(rows, rowCount);
-    assertEquals(rows, valueCount);
-    MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
-    ValidatingConverter converter = new ValidatingConverter();
-    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
-    for (int i = 0; i < rows; i++) {
-      assertEquals(0, columnReader.getCurrentRepetitionLevel());
-      assertEquals(0, columnReader.getCurrentDefinitionLevel());
-      columnReader.writeCurrentValueToConverter();
-      columnReader.consume();
-    }
-    assertEquals(rows, converter.count);
-  }
-
-  @Test
-  public void testOptional() {
-    MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
-    ColumnDescriptor col = schema.getColumns().get(0);
-    MemPageWriter pageWriter = new MemPageWriter();
-    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
-    for (int i = 0; i < rows; i++) {
-      columnWriterV2.writeNull(0, 0);
-      if ((i + 1) % 1000 == 0) {
-        columnWriterV2.writePage(i);
-      }
-    }
-    columnWriterV2.writePage(rows);
-    columnWriterV2.finalizeColumnChunk();
-    List<DataPage> pages = pageWriter.getPages();
-    int valueCount = 0;
-    int rowCount = 0;
-    for (DataPage dataPage : pages) {
-      valueCount += dataPage.getValueCount();
-      rowCount += ((DataPageV2)dataPage).getRowCount();
-    }
-    assertEquals(rows, rowCount);
-    assertEquals(rows, valueCount);
-    MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
-    ValidatingConverter converter = new ValidatingConverter();
-    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
-    for (int i = 0; i < rows; i++) {
-      assertEquals(0, columnReader.getCurrentRepetitionLevel());
-      assertEquals(0, columnReader.getCurrentDefinitionLevel());
-      columnReader.consume();
-    }
-    assertEquals(0, converter.count);
-  }
-
-}


Mime
View raw message