crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-337: Add From methods to support multiple input paths
Date Wed, 05 Feb 2014 00:47:40 GMT
Updated Branches:
  refs/heads/apache-crunch-0.8 1552bc67a -> dfb0402f7


CRUNCH-337: Add From methods to support multiple input paths


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dfb0402f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dfb0402f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dfb0402f

Branch: refs/heads/apache-crunch-0.8
Commit: dfb0402f79499bec335c03c05fc4804ba9eba9df
Parents: 1552bc6
Author: Josh Wills <jwills@apache.org>
Authored: Tue Feb 4 07:27:19 2014 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Feb 4 16:47:31 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/io/From.java    | 163 ++++++++++++++++++-
 1 file changed, 160 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/dfb0402f/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
index 0f5d6e0..14793a6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * <p>Static factory methods for creating common {@link Source} types.</p>
@@ -114,6 +115,24 @@ public class From {
 
   /**
    * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+   * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code
TableSource}
+   * and {@code Source} factory methods.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyClass The {@code Writable} to use for the key
+   * @param valueClass The {@code Writable} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
+      List<Path> paths, Class<? extends FileInputFormat<K, V>> formatClass,
+      Class<K> keyClass, Class<V> valueClass) {
+    return formattedFile(paths, formatClass, Writables.writables(keyClass),
+        Writables.writables(valueClass));
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
    * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
    * and {@code Source} factory methods.
    * 
@@ -148,6 +167,24 @@ public class From {
   }
 
   /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+   * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+   * and {@code Source} factory methods.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyType The {@code PType} to use for the key
+   * @param valueType The {@code PType} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> formattedFile(List<Path> paths,
+                                                       Class<? extends FileInputFormat<?,
?>> formatClass,
+                                                       PType<K> keyType, PType<V>
valueType) {
+    PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
+    return new FileTableSourceImpl<K, V>(paths, tableType, formatClass);
+  }
+
+  /**
    * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
    * 
    * @param pathName The name of the path to the data on the filesystem
@@ -168,7 +205,18 @@ public class From {
   public static <T extends SpecificRecord> Source<T> avroFile(Path path, Class<T>
avroClass) {
     return avroFile(path, Avros.specifics(avroClass));  
   }
-  
+
+  /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code
Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends SpecificRecord> Source<T> avroFile(List<Path>
paths, Class<T> avroClass) {
+    return avroFile(paths, Avros.specifics(avroClass));
+  }
+
   /**
    * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
    * 
@@ -192,6 +240,17 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code
Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param avroType The {@code AvroType} for the Avro records
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> avroFile(List<Path> paths, AvroType<T>
avroType) {
+    return new AvroFileSource<T>(paths, avroType);
+  }
+
+  /**
    * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
    * at the given path. If the path is a directory, the schema of a file in the directory
    * will be used to determine the schema to use.
@@ -217,6 +276,18 @@ public class From {
 
   /**
    * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
+   * at the given paths. If the path is a directory, the schema of a file in the directory
+   * will be used to determine the schema to use.
+   *
+   * @param paths A list of paths to the data on the filesystem
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(List<Path> paths) {
+    return avroFile(paths, new Configuration());
+  }
+
+  /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
    * at the given path using the {@code FileSystem} information contained in the given
    * {@code Configuration} instance. If the path is a directory, the schema of a file in
    * the directory will be used to determine the schema to use.
@@ -229,6 +300,20 @@ public class From {
     return avroFile(path, Avros.generics(getSchemaFromPath(path, conf)));
   }
 
+  /**
+   * Creates a {@code Source<GenericData.Record>} by reading the schema of the Avro
file
+   * at the given paths using the {@code FileSystem} information contained in the given
+   * {@code Configuration} instance. If the first path is a directory, the schema of a file
in
+   * the directory will be used to determine the schema to use.
+   *
+   * @param paths The path to the data on the filesystem
+   * @param conf The configuration information
+   * @return A new {@code Source<GenericData.Record>} instance
+   */
+  public static Source<GenericData.Record> avroFile(List<Path> paths, Configuration
conf) {
+    return avroFile(paths, Avros.generics(getSchemaFromPath(paths.get(0), conf)));
+  }
+
   static Schema getSchemaFromPath(Path path, Configuration conf) {
     DataFileReader reader = null;
     try {
@@ -284,7 +369,19 @@ public class From {
   public static <T extends Writable> Source<T> sequenceFile(Path path, Class<T>
valueClass) {
     return sequenceFile(path, Writables.writables(valueClass));
   }
-  
+
+  /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code
Path}s
+   * from the value field of each key-value pair in the SequenceFile(s).
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends Writable> Source<T> sequenceFile(List<Path>
paths, Class<T> valueClass) {
+    return sequenceFile(paths, Writables.writables(valueClass));
+  }
+
   /**
    * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path
name
    * from the value field of each key-value pair in the SequenceFile(s).
@@ -310,6 +407,18 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code
Path}s
+   * from the value field of each key-value pair in the SequenceFile(s).
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param ptype The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> sequenceFile(List<Path> paths, PType<T>
ptype) {
+    return new SeqFileSource<T>(paths, ptype);
+  }
+
+  /**
    * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given
path name.
    * 
    * @param pathName The name of the path to the data on the filesystem
@@ -334,7 +443,20 @@ public class From {
       Path path, Class<K> keyClass, Class<V> valueClass) {
     return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
   }
-  
+
+  /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given
{@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
+   * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
+      List<Path> paths, Class<K> keyClass, Class<V> valueClass) {
+    return sequenceFile(paths, Writables.writables(keyClass), Writables.writables(valueClass));
+  }
+
   /**
    * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given
path name.
    * 
@@ -361,6 +483,19 @@ public class From {
   }
 
   /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given
{@code Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param keyType The {@code PType} for the key of the SequenceFile entry
+   * @param valueType The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> sequenceFile(List<Path> paths,
PType<K> keyType, PType<V> valueType) {
+    PTypeFamily ptf = keyType.getFamily();
+    return new SeqFileTableSource<K, V>(paths, ptf.tableOf(keyType, valueType));
+  }
+
+  /**
    * Creates a {@code Source<String>} instance for the text file(s) at the given path
name.
    * 
    * @param pathName The name of the path to the data on the filesystem
@@ -381,6 +516,16 @@ public class From {
   }
 
   /**
+   * Creates a {@code Source<String>} instance for the text file(s) at the given {@code
Path}s.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @return A new {@code Source<String>} instance
+   */
+  public static Source<String> textFile(List<Path> paths) {
+    return textFile(paths, Writables.strings());
+  }
+
+  /**
    * Creates a {@code Source<T>} instance for the text file(s) at the given path name
using
    * the provided {@code PType<T>} to convert the input text.
    * 
@@ -403,4 +548,16 @@ public class From {
   public static <T> Source<T> textFile(Path path, PType<T> ptype) {
     return new TextFileSource<T>(path, ptype);
   }
+
+  /**
+   * Creates a {@code Source<T>} instance for the text file(s) at the given {@code
Path}s using
+   * the provided {@code PType<T>} to convert the input text.
+   *
+   * @param paths A list of {@code Path}s to the data
+   * @param ptype The {@code PType<T>} to use to process the input text
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> textFile(List<Path> paths, PType<T>
ptype) {
+    return new TextFileSource<T>(paths, ptype);
+  }
 }


Mime
View raw message