crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject git commit: CRUNCH-277. Support Parquet.
Date Wed, 09 Oct 2013 11:05:17 GMT
Updated Branches:
  refs/heads/master 910b6afbe -> f47e778b7


CRUNCH-277. Support Parquet.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f47e778b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f47e778b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f47e778b

Branch: refs/heads/master
Commit: f47e778b78e7d667556cfa0f9ff0c3a936e91e3e
Parents: 910b6af
Author: Tom White <tomwhite@apache.org>
Authored: Wed Oct 9 12:04:43 2013 +0100
Committer: Tom White <tomwhite@apache.org>
Committed: Wed Oct 9 12:04:43 2013 +0100

----------------------------------------------------------------------
 crunch-core/pom.xml                             |   5 +
 .../parquet/AvroParquetFileSourceTargetIT.java  | 114 +++++++++
 .../io/parquet/AvroParquetPipelineIT.java       | 237 +++++++++++++++++++
 .../crunch/io/parquet/AvroParquetConverter.java |  59 +++++
 .../parquet/AvroParquetFileReaderFactory.java   | 100 ++++++++
 .../io/parquet/AvroParquetFileSource.java       |  69 ++++++
 .../io/parquet/AvroParquetFileSourceTarget.java |  41 ++++
 .../io/parquet/AvroParquetFileTarget.java       | 119 ++++++++++
 .../AvroParquetFileReaderFactoryTest.java       | 104 ++++++++
 pom.xml                                         |  13 +
 10 files changed, 861 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index 129990f..8a5beb6 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -54,6 +54,11 @@ under the License.
     </dependency>
 
     <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-avro</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.javassist</groupId>
       <artifactId>javassist</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
new file mode 100644
index 0000000..b6d51f2
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetWriter;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class AvroParquetFileSourceTargetIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro.parquet");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
+    AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+        new Path(avroFile.getPath()), schema);
+
+    for (GenericRecord record : genericRecords) {
+      writer.write(record);
+    }
+
+    writer.close();
+  }
+
+  @Test
+  public void testSpecific() throws IOException {
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(new AvroParquetFileSource<Person>(new
Path(avroFile.getAbsolutePath()),
+        Avros.records(Person.class)));
+
+    List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+
+    Person expectedPerson = new Person();
+    expectedPerson.name = "John Doe";
+    expectedPerson.age = 42;
+
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    expectedPerson.siblingnames = siblingNames;
+
+    assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+  }
+
+  @Test
+  public void testGeneric() throws IOException {
+    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+    GenericRecord savedRecord = new Record(genericPersonSchema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Record> genericCollection = pipeline.read(new AvroParquetFileSource<Record>(new
Path
+        (avroFile.getAbsolutePath()),
+        Avros.generics(genericPersonSchema)));
+
+    List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+
+    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
new file mode 100644
index 0000000..055d0d7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetReader;
+import parquet.avro.AvroParquetWriter;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class AvroParquetPipelineIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro.parquet");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+  }
+
+  private void populateGenericParquetFile(List<GenericRecord> genericRecords, Schema
schema) throws IOException {
+    AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+        new Path(avroFile.getPath()), schema);
+
+    for (GenericRecord record : genericRecords) {
+      writer.write(record);
+    }
+
+    writer.close();
+  }
+
+  @Test
+  public void toAvroParquetFileTarget() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, parquetFileTarget);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath());
+
+    AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+    try {
+      Person readPerson = reader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void toAvroParquetFileTargetFromParquet() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericParquetFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(
+        new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, parquetFileTarget);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath());
+
+    AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+    try {
+      Person readPerson = reader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void toAvroParquetFileMultipleTarget() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+
+    PCollection<Employee> employees = genericCollection.parallelDo(new DoFn<Person,
Employee>() {
+      @Override
+      public void process(Person person, Emitter<Employee> emitter) {
+        emitter.emit(new Employee(person.getName(), 0, "Eng"));
+      }
+    }, Avros.records(Employee.class));
+
+    File output1File = tmpDir.getFile("output1");
+    File output2File = tmpDir.getFile("output2");
+    pipeline.write(genericCollection, new AvroParquetFileTarget(output1File.getAbsolutePath()));
+    pipeline.write(employees, new AvroParquetFileSourceTarget(new Path(output2File.getAbsolutePath()),
+        Avros.records(Employee.class)));
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+    Employee employee = employees.materialize().iterator().next();
+
+    Path parquet1File = new Path(new File(output1File, "part-m-00000.parquet").getPath());
+    Path parquet2File = new Path(new File(output2File, "part-m-00000.parquet").getPath());
+
+    AvroParquetReader<Person> personReader = new AvroParquetReader<Person>(parquet1File);
+
+    try {
+      Person readPerson = personReader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      personReader.close();
+    }
+
+    AvroParquetReader<Employee> employeeReader = new AvroParquetReader<Employee>(parquet2File);
+
+    try {
+      Employee readEmployee = employeeReader.read();
+      assertThat(readEmployee, is(employee));
+    } finally {
+      employeeReader.close();
+    }
+
+  }
+
+  @Test
+  public void toAvroParquetFileTargetReadSource() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, parquetFileTarget);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    PCollection<Person> retrievedPeople = pipeline.read(new AvroParquetFileSource<Person>(
+        new Path(outputFile.toURI()), Avros.records(Person.class)));
+
+    Person retrievedPerson = retrievedPeople.materialize().iterator().next();
+
+    assertThat(retrievedPerson, is(person));
+
+    Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath());
+
+    AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+    try {
+      Person readPerson = reader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
new file mode 100644
index 0000000..5cb231f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.avro.AvroType;
+
+class AvroParquetConverter<T> implements Converter<Void, T, T, Iterable<T>>
{
+  private AvroType<T> ptype;
+
+  public AvroParquetConverter(AvroType<T> ptype) {
+    this.ptype = ptype;
+  }
+
+  @Override
+  public T convertInput(Void key, T value) {
+    return value;
+  }
+
+  @Override
+  public Iterable<T> convertIterableInput(Void key, Iterable<T> value) {
+    return value;
+  }
+
+  @Override
+  public Void outputKey(T value) {
+    return null;
+  }
+
+  @Override
+  public T outputValue(T value) {
+    return value;
+  }
+
+  @Override
+  public Class<Void> getKeyClass() {
+    return Void.class;
+  }
+
+  @Override
+  public Class<T> getValueClass() {
+    return ptype.getTypeClass();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
new file mode 100644
index 0000000..c193563
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.UnmodifiableIterator;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.avro.AvroReadSupport;
+import parquet.hadoop.ParquetReader;
+import parquet.schema.MessageType;
+
+class AvroParquetFileReaderFactory<T> implements FileReaderFactory<T> {
+
+  private AvroType<T> avroType;
+
+  public AvroParquetFileReaderFactory(AvroType<T> avroType) {
+    this.avroType = avroType;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Iterator<T> read(FileSystem fs, Path path) {
+    Path p = fs.makeQualified(path);
+    final ParquetReader reader;
+    try {
+      reader = new ParquetReader(p, new CrunchAvroReadSupport(avroType));
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+    return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
+
+      private T next;
+
+      @Override
+      public boolean hasNext() {
+        if (next != null) {
+          return true;
+        }
+        try {
+          next = (T) reader.read();
+        } catch (IOException e) {
+          throw new CrunchRuntimeException(e);
+        }
+        return next != null;
+      }
+
+      @Override
+      public T next() {
+        if (hasNext()) {
+          T ret = next;
+          next = null;
+          return ret;
+        }
+        throw new NoSuchElementException();
+      }
+    });
+
+  }
+
+  static class CrunchAvroReadSupport<T extends IndexedRecord> extends AvroReadSupport<T>
{
+    private AvroType<T> avroType;
+
+    public CrunchAvroReadSupport(AvroType<T> avroType) {
+      this.avroType = avroType;
+    }
+
+    @Override
+    public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema) {
+      if (avroType != null) {
+        setRequestedProjection(configuration, avroType.getSchema());
+      }
+      return super.init(configuration, keyValueMetaData, fileSchema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
new file mode 100644
index 0000000..81678d4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroReadSupport;
+
+public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T>
{
+
+  private static <S> FormatBundle<AvroParquetInputFormat> getBundle(AvroType<S>
ptype) {
+    return FormatBundle.forInput(AvroParquetInputFormat.class)
+        .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, ptype.getSchema().toString())
+        // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
+        // doesn't work with CombineFileInputFormat
+        .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+  }
+
+  public AvroParquetFileSource(Path path, AvroType<T> ptype) {
+    super(path, ptype, getBundle(ptype));
+  }
+
+  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
+    super(paths, ptype, getBundle(ptype));
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return read(conf, getFileReaderFactory((AvroType<T>) ptype));
+  }
+
+  protected AvroParquetFileReaderFactory<T> getFileReaderFactory(AvroType<T>
ptype){
+    return new AvroParquetFileReaderFactory<T>(ptype);
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return new AvroParquetConverter<T>((AvroType<T>) ptype);
+  }
+
+  @Override
+  public String toString() {
+    return "Parquet(" + pathsAsString() + ")";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
new file mode 100644
index 0000000..8d93eba
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+public class AvroParquetFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T>
{
+
+  public AvroParquetFileSourceTarget(Path path, AvroType<T> atype) {
+    this(path, atype, SequentialFileNamingScheme.getInstance());
+  }
+
+  public AvroParquetFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme
fileNamingScheme) {
+    super(new AvroParquetFileSource<T>(path, atype), new AvroParquetFileTarget(path),
+        fileNamingScheme);
+  }
+
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
new file mode 100644
index 0000000..c67b9f1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import parquet.avro.AvroWriteSupport;
+import parquet.hadoop.ParquetOutputFormat;
+
+public class AvroParquetFileTarget extends FileTargetImpl {
+
+  private static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema";
+
+  public AvroParquetFileTarget(String path) {
+    this(new Path(path));
+  }
+
+  public AvroParquetFileTarget(Path path) {
+    this(path, SequentialFileNamingScheme.getInstance());
+  }
+
+  public AvroParquetFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, CrunchAvroParquetOutputFormat.class, fileNamingScheme);
+  }
+
+  @Override
+  public String toString() {
+    return "Parquet(" + path.toString() + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (!(ptype instanceof AvroType)) {
+      return false;
+    }
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+    return new AvroParquetConverter<Object>((AvroType<Object>) ptype);
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
+    AvroType<?> atype = (AvroType<?>) ptype;
+    Configuration conf = job.getConfiguration();
+    String schemaParam;
+    if (name == null) {
+      schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER;
+    } else {
+      schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER + "." + name;
+    }
+    String outputSchema = conf.get(schemaParam);
+    if (outputSchema == null) {
+      conf.set(schemaParam, atype.getSchema().toString());
+    } else if (!outputSchema.equals(atype.getSchema().toString())) {
+      throw new IllegalStateException("Avro targets must use the same output schema");
+    }
+    configureForMapReduce(job, Void.class, atype.getTypeClass(),
+        CrunchAvroParquetOutputFormat.class, outputPath, name);
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof AvroType) {
+      return new AvroParquetFileSourceTarget<T>(path, (AvroType<T>) ptype);
+    }
+    return null;
+  }
+
+  static class CrunchAvroWriteSupport extends AvroWriteSupport {
+    @Override
+    public WriteContext init(Configuration conf) {
+      String outputName = conf.get("crunch.namedoutput");
+      if (outputName != null && !outputName.isEmpty()) {
+        String schema = conf.get(PARQUET_AVRO_SCHEMA_PARAMETER + "." + outputName);
+        setSchema(conf, new Schema.Parser().parse(schema));
+      }
+      return super.init(conf);
+    }
+  }
+
+  static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord>
{
+
+    public CrunchAvroParquetOutputFormat() {
+      super(new CrunchAvroWriteSupport());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
new file mode 100644
index 0000000..9f5ff70
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetWriter;
+
+public class AvroParquetFileReaderFactoryTest {
+
+  private File parquetFile;
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    parquetFile = tmpDir.getFile("test.avro.parquet");
+  }
+
+  @After
+  public void tearDown() {
+    parquetFile.delete();
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
+    AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+        new Path(parquetFile.getPath()), schema);
+
+    for (GenericRecord record : genericRecords) {
+      writer.write(record);
+    }
+
+    writer.close();
+  }
+
+  private <T> AvroParquetFileReaderFactory<T> createFileReaderFactory(AvroType<T>
avroType) {
+    return new AvroParquetFileReaderFactory<T>(avroType);
+  }
+
+  @Test
+  public void testProjection() throws IOException {
+    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+    GenericRecord savedRecord = new Record(genericPersonSchema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+    Schema projection = Schema.createRecord("projection", null, null, false);
+    projection.setFields(Lists.newArrayList(cloneField(genericPersonSchema.getField("name"))));
+    AvroParquetFileReaderFactory<Record> genericReader = createFileReaderFactory(Avros.generics(projection));
+    Iterator<Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()),
+        new Path(this.parquetFile.getAbsolutePath()));
+
+    GenericRecord genericRecord = recordIterator.next();
+    assertEquals(savedRecord.get("name"), genericRecord.get("name"));
+    assertNull(genericRecord.get("age"));
+    assertFalse(recordIterator.hasNext());
+  }
+
+  public static Schema.Field cloneField(Schema.Field field) {
+    return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba3258c..45283f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@ under the License.
     <commons-logging.version>1.1.1</commons-logging.version>
     <commons-cli.version>1.2</commons-cli.version>
     <avro.version>1.7.4</avro.version>
+    <parquet.version>1.2.0</parquet.version>
     <javassist.version>3.16.1-GA</javassist.version>
     <jackson.version>1.8.8</jackson.version>
     <protobuf-java.version>2.4.0a</protobuf-java.version>
@@ -209,6 +210,18 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>parquet-avro</artifactId>
+        <version>${parquet.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
         <groupId>org.javassist</groupId>
         <artifactId>javassist</artifactId>
         <version>${javassist.version}</version>


Mime
View raw message