crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-552: Add support/tests for Parquet files w/Crunch on Spark
Date Tue, 28 Jul 2015 02:20:25 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 95e92fc89 -> 29d1ce4cd


CRUNCH-552: Add support/tests for Parquet files w/Crunch on Spark


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/29d1ce4c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/29d1ce4c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/29d1ce4c

Branch: refs/heads/master
Commit: 29d1ce4cddcddc86cda36bd464c26df4bdff3ae4
Parents: 95e92fc
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jul 27 17:13:30 2015 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jul 27 19:07:12 2015 -0700

----------------------------------------------------------------------
 crunch-core/pom.xml                             |  17 --
 .../io/parquet/AvroParquetFileTarget.java       |   5 +-
 crunch-core/src/test/avro/employee.avsc         |  26 --
 crunch-core/src/test/avro/person.avsc           |  26 --
 .../crunch/SparkAvroParquetPipelineIT.java      | 242 +++++++++++++++++++
 .../apache/crunch/impl/spark/SparkRuntime.java  |   1 +
 crunch-test/pom.xml                             |  17 ++
 crunch-test/src/main/avro/employee.avsc         |  26 ++
 crunch-test/src/main/avro/person.avsc           |  26 ++
 9 files changed, 314 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index 59794f0..d88a652 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -176,23 +176,6 @@ under the License.
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-failsafe-plugin</artifactId>
       </plugin>
-      <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>schemas</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>schema</goal>
-            </goals>
-            <configuration>
-              <testSourceDirectory>${project.basedir}/src/test/avro/</testSourceDirectory>
-              <testOutputDirectory>target/generated-test-sources/</testOutputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index 3c2847d..5fb4c53 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -107,7 +107,7 @@ public class AvroParquetFileTarget extends FileTargetImpl {
     return null;
   }
 
-  static class CrunchAvroWriteSupport extends AvroWriteSupport {
+  public static class CrunchAvroWriteSupport extends AvroWriteSupport {
     @Override
     public WriteContext init(Configuration conf) {
       String outputName = conf.get("crunch.namedoutput");
@@ -119,8 +119,7 @@ public class AvroParquetFileTarget extends FileTargetImpl {
     }
   }
 
-  static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord>
{
-
+  public static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord>
{
     public CrunchAvroParquetOutputFormat() {
       super(new CrunchAvroWriteSupport());
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/src/test/avro/employee.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/avro/employee.avsc b/crunch-core/src/test/avro/employee.avsc
deleted file mode 100644
index 35726e1..0000000
--- a/crunch-core/src/test/avro/employee.avsc
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-{
-"namespace": "org.apache.crunch.test",
-"name": "Employee",
-"type": "record",
-"fields": [
-  {"name": "name", "type": ["string", "null"] },
-  {"name": "salary", "type": "int"},
-  {"name": "department", "type": ["string", "null"] } ]
-} 

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-core/src/test/avro/person.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/avro/person.avsc b/crunch-core/src/test/avro/person.avsc
deleted file mode 100644
index eb24071..0000000
--- a/crunch-core/src/test/avro/person.avsc
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-{
-"namespace": "org.apache.crunch.test",
-"name": "Person",
-"type": "record",
-"fields": [
-  {"name": "name", "type": ["string", "null"] },
-  {"name": "age", "type": "int"},
-  {"name": "siblingnames", "type" : ["null", { "type": "array", "items": "string" }], "default":
null } ]
-} 

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java
new file mode 100644
index 0000000..f5e2c25
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkAvroParquetPipelineIT.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.parquet.AvroParquetFileSource;
+import org.apache.crunch.io.parquet.AvroParquetFileSourceTarget;
+import org.apache.crunch.io.parquet.AvroParquetFileTarget;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetReader;
+import parquet.avro.AvroParquetWriter;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class SparkAvroParquetPipelineIT implements Serializable {
+
+  private transient File avroFile;
+
+  @Rule
+  public transient TemporaryPath tmpDir = new TemporaryPath(RuntimeParameters.TMP_DIR, "hadoop.tmp.dir");
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro.parquet");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema)
throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+  }
+
+  private void populateGenericParquetFile(List<GenericRecord> genericRecords, Schema
schema) throws IOException {
+    AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+            new Path(avroFile.getPath()), schema);
+
+    for (GenericRecord record : genericRecords) {
+      writer.write(record);
+    }
+
+    writer.close();
+  }
+
+  @Test
+  public void toAvroParquetFileTarget() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new SparkPipeline("local", "avroparq");
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+            Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, parquetFileTarget);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    Path parquetFile = new Path(new File(outputFile, "part-r-00000.parquet").getPath());
+
+    AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+    try {
+      Person readPerson = reader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      reader.close();
+      pipeline.done();
+    }
+  }
+
+  @Test
+  public void toAvroParquetFileTargetFromParquet() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericParquetFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new SparkPipeline("local", "avroparq");
+    PCollection<Person> genericCollection = pipeline.read(
+            new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()),
Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, parquetFileTarget);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    Path parquetFile = new Path(new File(outputFile, "part-r-00000.parquet").getPath());
+
+    AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+    try {
+      Person readPerson = reader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      reader.close();
+      pipeline.done();
+    }
+  }
+
+  @Test
+  public void toAvroParquetFileMultipleTarget() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new SparkPipeline("local", "avroparq");
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+            Avros.records(Person.class)));
+
+    PCollection<Employee> employees = genericCollection.parallelDo(new DoFn<Person,
Employee>() {
+      @Override
+      public void process(Person person, Emitter<Employee> emitter) {
+        emitter.emit(new Employee(person.getName(), 0, "Eng"));
+      }
+    }, Avros.records(Employee.class));
+
+    File output1File = tmpDir.getFile("output1");
+    File output2File = tmpDir.getFile("output2");
+    pipeline.write(genericCollection, new AvroParquetFileTarget(output1File.getAbsolutePath()));
+    pipeline.write(employees, new AvroParquetFileSourceTarget(new Path(output2File.getAbsolutePath()),
+            Avros.records(Employee.class)));
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+    Employee employee = employees.materialize().iterator().next();
+
+    Path parquet1File = new Path(new File(output1File, "part-r-00000.parquet").getPath());
+    Path parquet2File = new Path(new File(output2File, "part-r-00000.parquet").getPath());
+
+    AvroParquetReader<Person> personReader = new AvroParquetReader<Person>(parquet1File);
+
+    try {
+      Person readPerson = personReader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      personReader.close();
+      pipeline.done();
+    }
+
+    AvroParquetReader<Employee> employeeReader = new AvroParquetReader<Employee>(parquet2File);
+
+    try {
+      Employee readEmployee = employeeReader.read();
+      assertThat(readEmployee, is(employee));
+    } finally {
+      employeeReader.close();
+    }
+
+  }
+
+  @Test
+  public void toAvroParquetFileTargetReadSource() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new SparkPipeline("local", "avroparq");
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+            Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, parquetFileTarget);
+    pipeline.run();
+
+    Person person = genericCollection.materialize().iterator().next();
+
+    PCollection<Person> retrievedPeople = pipeline.read(new AvroParquetFileSource<Person>(
+            new Path(outputFile.toURI()), Avros.records(Person.class)));
+
+    Person retrievedPerson = retrievedPeople.materialize().iterator().next();
+
+    assertThat(retrievedPerson, is(person));
+
+    Path parquetFile = new Path(new File(outputFile, "part-r-00000.parquet").getPath());
+
+    AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+    try {
+      Person readPerson = reader.read();
+      assertThat(readPerson, is(person));
+    } finally {
+      reader.close();
+      pipeline.done();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 5798e4c..f1dce0b 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -331,6 +331,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult>
implements Pipe
                 job.setOutputKeyClass(outConfig.keyClass);
                 job.setOutputValueClass(outConfig.valueClass);
                 outConfig.bundle.configure(job.getConfiguration());
+                job.getConfiguration().set("crunch.namedoutput", "out0");
                 Path tmpPath = pipeline.createTempPath();
                 outRDD.saveAsNewAPIHadoopFile(
                     tmpPath.toString(),

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-test/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-test/pom.xml b/crunch-test/pom.xml
index bb6f493..a023066 100644
--- a/crunch-test/pom.xml
+++ b/crunch-test/pom.xml
@@ -89,6 +89,23 @@ under the License.
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-failsafe-plugin</artifactId>
       </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>schemas</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+              <outputDirectory>target/generated-sources/</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-test/src/main/avro/employee.avsc
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/avro/employee.avsc b/crunch-test/src/main/avro/employee.avsc
new file mode 100644
index 0000000..35726e1
--- /dev/null
+++ b/crunch-test/src/main/avro/employee.avsc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+"namespace": "org.apache.crunch.test",
+"name": "Employee",
+"type": "record",
+"fields": [
+  {"name": "name", "type": ["string", "null"] },
+  {"name": "salary", "type": "int"},
+  {"name": "department", "type": ["string", "null"] } ]
+} 

http://git-wip-us.apache.org/repos/asf/crunch/blob/29d1ce4c/crunch-test/src/main/avro/person.avsc
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/avro/person.avsc b/crunch-test/src/main/avro/person.avsc
new file mode 100644
index 0000000..eb24071
--- /dev/null
+++ b/crunch-test/src/main/avro/person.avsc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+"namespace": "org.apache.crunch.test",
+"name": "Person",
+"type": "record",
+"fields": [
+  {"name": "name", "type": ["string", "null"] },
+  {"name": "age", "type": "int"},
+  {"name": "siblingnames", "type" : ["null", { "type": "array", "items": "string" }], "default":
null } ]
+} 


Mime
View raw message