Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 76ED0103AC for ; Tue, 28 Jan 2014 03:18:37 +0000 (UTC) Received: (qmail 65757 invoked by uid 500); 28 Jan 2014 03:18:36 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 65717 invoked by uid 500); 28 Jan 2014 03:18:36 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 65706 invoked by uid 99); 28 Jan 2014 03:18:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jan 2014 03:18:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9FD45907FC6; Tue, 28 Jan 2014 03:18:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <32c08d135c29427db2c3104f91d56c62@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-333: From and At methods to support reading Avro files using the on-disk schema Date: Tue, 28 Jan 2014 03:18:34 +0000 (UTC) Updated Branches: refs/heads/apache-crunch-0.8 243d2ff27 -> 5f6651db1 CRUNCH-333: From and At methods to support reading Avro files using the on-disk schema Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5f6651db Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5f6651db Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5f6651db Branch: refs/heads/apache-crunch-0.8 Commit: 5f6651db15532e643da08ee3eb5acb4173f7c832 Parents: 243d2ff Author: Josh Wills Authored: Mon Jan 27 16:50:11 2014 -0800 Committer: Josh Wills Committed: Mon Jan 27 19:18:18 2014 -0800 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileSourceTargetIT.java | 20 +++++ .../src/main/java/org/apache/crunch/io/At.java | 43 +++++++++- .../main/java/org/apache/crunch/io/From.java | 86 +++++++++++++++++++- 3 files changed, 146 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5f6651db/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java index 671b920..9f51f23 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java @@ -36,11 +36,13 @@ import org.apache.crunch.PCollection; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.At; +import org.apache.crunch.io.From; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -102,6 +104,24 @@ public class AvroFileSourceTargetIT implements Serializable { } @Test + public void testReadAsGeneric() throws IOException { + GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$); + + Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration()); + PCollection genericCollection = pipeline.read(From.avroFile( + new Path(avroFile.getAbsolutePath()), + tmpDir.getDefaultConfiguration())); + + List personList = Lists.newArrayList(genericCollection.materialize()); + + assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(personList)); + } + + @Test public void testGeneric() throws IOException { String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson"); Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson); http://git-wip-us.apache.org/repos/asf/crunch/blob/5f6651db/crunch-core/src/main/java/org/apache/crunch/io/At.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/At.java b/crunch-core/src/main/java/org/apache/crunch/io/At.java index a6f0782..e60831a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/At.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/At.java @@ -17,7 +17,9 @@ */ package org.apache.crunch.io; +import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificRecord; +import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSourceTarget; import org.apache.crunch.io.avro.AvroFileSourceTarget; @@ -29,6 +31,7 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -85,7 +88,45 @@ public class At { public static SourceTarget avroFile(Path path, Class avroClass) { return avroFile(path, Avros.specifics(avroClass)); } - + + /** + * Creates a {@code SourceTarget} by reading the schema of the Avro file + * at the given path. If the path is a directory, the schema of a file in the directory + * will be used to determine the schema to use. + * + * @param pathName The name of the path to the data on the filesystem + * @return A new {@code SourceTarget} instance + */ + public static SourceTarget avroFile(String pathName) { + return avroFile(new Path(pathName)); + } + + /** + * Creates a {@code SourceTarget} by reading the schema of the Avro file + * at the given path. If the path is a directory, the schema of a file in the directory + * will be used to determine the schema to use. + * + * @param path The path to the data on the filesystem + * @return A new {@code SourceTarget} instance + */ + public static SourceTarget avroFile(Path path) { + return avroFile(path, new Configuration()); + } + + /** + * Creates a {@code SourceTarget} by reading the schema of the Avro file + * at the given path using the {@code FileSystem} information contained in the given + * {@code Configuration} instance. If the path is a directory, the schema of a file in + * the directory will be used to determine the schema to use. + * + * @param path The path to the data on the filesystem + * @param conf The configuration information + * @return A new {@code SourceTarget} instance + */ + public static SourceTarget avroFile(Path path, Configuration conf) { + return avroFile(path, Avros.generics(From.getSchemaFromPath(path, conf))); + } + /** * Creates a {@code SourceTarget} instance from the Avro file(s) at the given path name. * http://git-wip-us.apache.org/repos/asf/crunch/blob/5f6651db/crunch-core/src/main/java/org/apache/crunch/io/From.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java index e4cfb6a..0f5d6e0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/From.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java @@ -17,6 +17,12 @@ */ package org.apache.crunch.io; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.FsInput; import org.apache.avro.specific.SpecificRecord; import org.apache.crunch.Source; import org.apache.crunch.TableSource; @@ -31,10 +37,16 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import java.io.IOException; + /** *

Static factory methods for creating common {@link Source} types.

* @@ -87,7 +99,7 @@ public class From { * {@code FileInputFormat} implementations not covered by the provided {@code TableSource} * and {@code Source} factory methods. * - * @param The {@code Path} to the data + * @param path The {@code Path} to the data * @param formatClass The {@code FileInputFormat} implementation * @param keyClass The {@code Writable} to use for the key * @param valueClass The {@code Writable} to use for the value @@ -122,7 +134,7 @@ public class From { * {@code FileInputFormat} implementations not covered by the provided {@code TableSource} * and {@code Source} factory methods. * - * @param The {@code Path} to the data + * @param path The {@code Path} to the data * @param formatClass The {@code FileInputFormat} implementation * @param keyType The {@code PType} to use for the key * @param valueType The {@code PType} to use for the value @@ -180,6 +192,76 @@ public class From { } /** + * Creates a {@code Source} by reading the schema of the Avro file + * at the given path. If the path is a directory, the schema of a file in the directory + * will be used to determine the schema to use. + * + * @param pathName The name of the path to the data on the filesystem + * @return A new {@code Source} instance + */ + public static Source avroFile(String pathName) { + return avroFile(new Path(pathName)); + } + + /** + * Creates a {@code Source} by reading the schema of the Avro file + * at the given path. If the path is a directory, the schema of a file in the directory + * will be used to determine the schema to use. + * + * @param path The path to the data on the filesystem + * @return A new {@code Source} instance + */ + public static Source avroFile(Path path) { + return avroFile(path, new Configuration()); + } + + /** + * Creates a {@code Source} by reading the schema of the Avro file + * at the given path using the {@code FileSystem} information contained in the given + * {@code Configuration} instance. If the path is a directory, the schema of a file in + * the directory will be used to determine the schema to use. + * + * @param path The path to the data on the filesystem + * @param conf The configuration information + * @return A new {@code Source} instance + */ + public static Source avroFile(Path path, Configuration conf) { + return avroFile(path, Avros.generics(getSchemaFromPath(path, conf))); + } + + static Schema getSchemaFromPath(Path path, Configuration conf) { + DataFileReader reader = null; + try { + FileSystem fs = FileSystem.get(conf); + if (!fs.isFile(path)) { + FileStatus[] fstat = fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + String name = path.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + if (fstat == null || fstat.length == 0) { + throw new IllegalArgumentException("No valid files found in directory: " + path); + } + path = fstat[0].getPath(); + } + reader = new DataFileReader(new FsInput(path, conf), new GenericDatumReader()); + return reader.getSchema(); + } catch (IOException e) { + throw new RuntimeException("Error reading schema from path: " + path, e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + // ignored + } + } + } + } + + /** * Creates a {@code Source} instance from the SequenceFile(s) at the given path name * from the value field of each key-value pair in the SequenceFile(s). *