crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-333: From and At methods to support reading Avro files using the on-disk schema
Date Tue, 28 Jan 2014 03:17:57 GMT
Updated Branches:
  refs/heads/master 0cbccccac -> 4406ded02


CRUNCH-333: From and At methods to support reading Avro files using the on-disk schema


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4406ded0
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4406ded0
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4406ded0

Branch: refs/heads/master
Commit: 4406ded0264bfd552738bea4483603fdb2c184a0
Parents: 0cbcccc
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jan 27 16:50:11 2014 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jan 27 18:40:07 2014 -0800

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileSourceTargetIT.java  | 20 +++++
 .../src/main/java/org/apache/crunch/io/At.java  | 43 +++++++++-
 .../main/java/org/apache/crunch/io/From.java    | 86 +++++++++++++++++++-
 3 files changed, 146 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/4406ded0/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
index 671b920..9f51f23 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -36,11 +36,13 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
+import org.apache.crunch.io.From;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -102,6 +104,24 @@ public class AvroFileSourceTargetIT implements Serializable {
   }
 
   @Test
+  public void testReadAsGeneric() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<GenericData.Record> genericCollection = pipeline.read(From.avroFile(
+        new Path(avroFile.getAbsolutePath()),
+        tmpDir.getDefaultConfiguration()));
+
+    List<GenericData.Record> personList = Lists.newArrayList(genericCollection.materialize());
+
+    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(personList));
+  }
+
+  @Test
   public void testGeneric() throws IOException {
     String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
     Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);

http://git-wip-us.apache.org/repos/asf/crunch/blob/4406ded0/crunch-core/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/At.java b/crunch-core/src/main/java/org/apache/crunch/io/At.java
index a6f0782..e60831a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/At.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/At.java
@@ -17,7 +17,9 @@
  */
 package org.apache.crunch.io;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSourceTarget;
 import org.apache.crunch.io.avro.AvroFileSourceTarget;
@@ -29,6 +31,7 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 
@@ -85,7 +88,45 @@ public class At {
   public static <T extends SpecificRecord> SourceTarget<T> avroFile(Path path,
Class<T> avroClass) {
     return avroFile(path, Avros.specifics(avroClass));  
   }
-  
+
+  /**
+   * Creates a {@code SourceTarget<GenericData.Record>} by reading the schema of the
Avro file
+   * at the given path. If the path is a directory, the schema of a file in the directory
+   * will be used to determine the schema to use.
+   *
+   * @param pathName The name of the path to the data on the filesystem
+   * @return A new {@code SourceTarget<GenericData.Record>} instance
+   */
+  public static SourceTarget<GenericData.Record> avroFile(String pathName) {
+    return avroFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code SourceTarget<GenericData.Record>} by reading the schema of the
Avro file
+   * at the given path. If the path is a directory, the schema of a file in the directory
+   * will be used to determine the schema to use.
+   *
+   * @param path The path to the data on the filesystem
+   * @return A new {@code SourceTarget<GenericData.Record>} instance
+   */
+  public static SourceTarget<GenericData.Record> avroFile(Path path) {
+    return avroFile(path, new Configuration());
+  }
+
+  /**
+   * Creates a {@code SourceTarget<GenericData.Record>} by reading the schema of the
Avro file
+   * at the given path using the {@code FileSystem} information contained in the given
+   * {@code Configuration} instance. If the path is a directory, the schema of a file in
+   * the directory will be used to determine the schema to use.
+   *
+   * @param path The path to the data on the filesystem
+   * @param conf The configuration information
+   * @return A new {@code SourceTarget<GenericData.Record>} instance
+   */
+  public static SourceTarget<GenericData.Record> avroFile(Path path, Configuration
conf) {
+    return avroFile(path, Avros.generics(From.getSchemaFromPath(path, conf)));
+  }
+
   /**
    * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given
path name.
    * 

http://git-wip-us.apache.org/repos/asf/crunch/blob/4406ded0/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
index e4cfb6a..0f5d6e0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -17,6 +17,12 @@
  */
 package org.apache.crunch.io;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.FsInput;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
@@ -31,10 +37,16 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
+import java.io.IOException;
+
 /**
  * <p>Static factory methods for creating common {@link Source} types.</p>
  * 
@@ -87,7 +99,7 @@ public class From {
    * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code
TableSource}
    * and {@code Source} factory methods.
    * 
-   * @param  The {@code Path} to the data
+   * @param path The {@code Path} to the data
    * @param formatClass The {@code FileInputFormat} implementation
    * @param keyClass The {@code Writable} to use for the key
    * @param valueClass The {@code Writable} to use for the value
@@ -122,7 +134,7 @@ public class From {
    * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
    * and {@code Source} factory methods.
    * 
-   * @param  The {@code Path} to the data
+   * @param path The {@code Path} to the data
    * @param formatClass The {@code FileInputFormat} implementation
    * @param keyType The {@code PType} to use for the key
    * @param valueType The {@code PType} to use for the value
@@ -180,6 +192,76 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
+   * at the given path. If the path is a directory, the schema of a file in the directory
+   * will be used to determine the schema to use.
+   *
+   * @param pathName The name of the path to the data on the filesystem
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(String pathName) {
+    return avroFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
+   * at the given path. If the path is a directory, the schema of a file in the directory
+   * will be used to determine the schema to use.
+   *
+   * @param path The path to the data on the filesystem
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(Path path) {
+    return avroFile(path, new Configuration());
+  }
+
+  /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
+   * at the given path using the {@code FileSystem} information contained in the given
+   * {@code Configuration} instance. If the path is a directory, the schema of a file in
+   * the directory will be used to determine the schema to use.
+   *
+   * @param path The path to the data on the filesystem
+   * @param conf The configuration information
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(Path path, Configuration conf)
{
+    return avroFile(path, Avros.generics(getSchemaFromPath(path, conf)));
+  }
+
+  static Schema getSchemaFromPath(Path path, Configuration conf) {
+    DataFileReader reader = null;
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      if (!fs.isFile(path)) {
+        FileStatus[] fstat = fs.listStatus(path, new PathFilter() {
+          @Override
+          public boolean accept(Path path) {
+            String name = path.getName();
+            return !name.startsWith("_") && !name.startsWith(".");
+          }
+        });
+        if (fstat == null || fstat.length == 0) {
+          throw new IllegalArgumentException("No valid files found in directory: " + path);
+        }
+        path = fstat[0].getPath();
+      }
+      reader = new DataFileReader(new FsInput(path, conf), new GenericDatumReader<GenericRecord>());
+      return reader.getSchema();
+    } catch (IOException e) {
+      throw new RuntimeException("Error reading schema from path: "  + path, e);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          // ignored
+        }
+      }
+    }
+  }
+
+  /**
    * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path
name
    * from the value field of each key-value pair in the SequenceFile(s).
    * 


Mime
View raw message