crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-219: Allow FileSourceImpl to take in multiple paths
Date Tue, 25 Jun 2013 04:58:31 GMT
Updated Branches:
  refs/heads/master c51bcd63a -> 38a97e54c


CRUNCH-219: Allow FileSourceImpl to take in multiple paths


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/38a97e54
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/38a97e54
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/38a97e54

Branch: refs/heads/master
Commit: 38a97e54cef99749731a4b88d59b3100cc3b87e9
Parents: c51bcd6
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jun 24 21:47:09 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jun 24 21:47:09 2013 -0700

----------------------------------------------------------------------
 .../crunch/io/avro/AvroMemPipelineIT.java       | 35 ++++++++
 .../apache/crunch/io/avro/AvroFileSource.java   | 12 +--
 .../crunch/io/avro/trevni/TrevniKeySource.java  | 12 +--
 .../apache/crunch/io/impl/FileSourceImpl.java   | 84 ++++++++++++++++----
 .../crunch/io/impl/FileTableSourceImpl.java     |  9 +++
 .../org/apache/crunch/io/seq/SeqFileSource.java | 12 +--
 .../crunch/io/seq/SeqFileTableSource.java       | 13 +--
 .../apache/crunch/io/text/NLineFileSource.java  | 18 ++++-
 .../crunch/io/text/TextFileReaderFactory.java   | 10 +++
 .../apache/crunch/io/text/TextFileSource.java   | 11 ++-
 .../crunch/io/text/TextFileTableSource.java     | 18 ++++-
 11 files changed, 186 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
index 9cafa3f..cfb669e 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroMemPipelineIT.java
@@ -18,12 +18,14 @@ package org.apache.crunch.io.avro;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 
+import java.util.Set;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
@@ -35,6 +37,7 @@ import org.apache.crunch.test.Person;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -118,4 +121,36 @@ public class AvroMemPipelineIT implements Serializable {
     assertEquals(writeRecord, readRecord.toString());
   }
 
+  @Test
+  public void testMemPipelineWithMultiplePaths() {
+
+    GenericRecord writeRecord1 = createGenericRecord("John Doe");
+    final PCollection<GenericRecord> writeCollection1 = MemPipeline.collectionOf(Collections.singleton(writeRecord1));
+    writeCollection1.write(To.avroFile(avroFile.getAbsolutePath()));
+
+    File avroFile2 = tmpDir.getFile("test2.avro");
+    GenericRecord writeRecord2 = createGenericRecord("Jane Doe");
+    final PCollection<GenericRecord> writeCollection2 = MemPipeline.collectionOf(Collections.singleton(writeRecord2));
+    writeCollection2.write(To.avroFile(avroFile2.getAbsolutePath()));
+
+    List<Path> paths = Lists.newArrayList(new Path(avroFile.getAbsolutePath()),
+        new Path(avroFile2.getAbsolutePath()));
+    PCollection<Record> readCollection = MemPipeline.getInstance().read(
+        new AvroFileSource<Record>(paths, Avros.generics(writeRecord1.getSchema())));
+
+    Set<Record> readSet = Sets.newHashSet(readCollection.materialize());
+
+    assertEquals(Sets.newHashSet(writeRecord1, writeRecord2), readSet);
+  }
+
+  private GenericRecord createGenericRecord(String name) {
+
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", name);
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy"));
+
+    return savedRecord;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 15792bf..3e1e933 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -19,8 +19,8 @@ package org.apache.crunch.io.avro;
 
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.avro.mapred.AvroJob;
-import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
@@ -28,7 +28,6 @@ import org.apache.crunch.types.avro.AvroInputFormat;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T>
{
@@ -45,14 +44,17 @@ public class AvroFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
     super(path, ptype, getBundle(ptype));
   }
 
+  public AvroFileSource(List<Path> paths, AvroType<T> ptype) {
+    super(paths, ptype, getBundle(ptype));
+  }
+
   @Override
   public String toString() {
-    return "Avro(" + path.toString() + ")";
+    return "Avro(" + pathsAsString() + ")";
   }
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>)
ptype));
+    return read(conf, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
index 193ac1b..bee7ec1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeySource.java
@@ -17,15 +17,14 @@
  */
 package org.apache.crunch.io.avro.trevni;
 
+import java.util.List;
 import org.apache.avro.mapred.AvroJob;
-import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.trevni.avro.mapreduce.AvroTrevniKeyInputFormat;
 
@@ -45,14 +44,17 @@ public class TrevniKeySource<T> extends FileSourceImpl<T>
implements ReadableSou
     super(path, ptype, getBundle(ptype));
   }
 
+  public TrevniKeySource(List<Path> paths, AvroType<T> ptype) {
+    super(paths, ptype, getBundle(ptype));
+  }
+
   @Override
   public String toString() {
-    return "TrevniKey(" + path.toString() + ")";
+    return "TrevniKey(" + pathsAsString() + ")";
   }
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new TrevniFileReaderFactory<T>((AvroType<T>)
ptype));
+    return read(conf, new TrevniFileReaderFactory<T>((AvroType<T>) ptype));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 688c801..44139b0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -17,17 +17,25 @@
  */
 package org.apache.crunch.io.impl;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Source;
+import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.io.avro.AvroFileReaderFactory;
 import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -37,34 +45,58 @@ public class FileSourceImpl<T> implements Source<T> {
 
   private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
 
+  @Deprecated
   protected final Path path;
+  protected final List<Path> paths;
   protected final PType<T> ptype;
   protected final FormatBundle<? extends InputFormat> inputBundle;
 
   public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat>
inputFormatClass) {
-    this.path = path;
-    this.ptype = ptype;
-    this.inputBundle = FormatBundle.forInput(inputFormatClass);
+    this(path, ptype, FormatBundle.forInput(inputFormatClass));
   }
 
   public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat>
inputBundle) {
-    this.path = path;
+    this(Lists.newArrayList(path), ptype, inputBundle);
+  }
+
+  public FileSourceImpl(List<Path> paths, PType<T> ptype, Class<? extends
InputFormat> inputFormatClass) {
+    this(paths, ptype, FormatBundle.forInput(inputFormatClass));
+  }
+
+  public FileSourceImpl(List<Path> paths, PType<T> ptype, FormatBundle<? extends
InputFormat> inputBundle) {
+    this.path = paths.isEmpty() ? null : paths.get(0);
+    this.paths = paths;
     this.ptype = ptype;
     this.inputBundle = inputBundle;
   }
 
+  @Deprecated
   public Path getPath() {
-    return path;
+    if (paths.isEmpty()) {
+      return null;
+    } else if (paths.size() > 1) {
+      LOG.warn("getPath() called for source with multiple paths, only " +
+          "returning first. Source: " + this);
+    }
+    return paths.get(0);
+  }
+
+  public List<Path> getPaths() {
+    return paths;
   }
   
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
     if (inputId == -1) {
-      FileInputFormat.addInputPath(job, path);
+      for (Path path : paths) {
+        FileInputFormat.addInputPath(job, path);
+      }
       job.setInputFormatClass(inputBundle.getFormatClass());
       inputBundle.configure(job.getConfiguration());
     } else {
-      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
+      for (Path path : paths) {
+        CrunchInputs.addInputPath(job, path, inputBundle, inputId);
+      }
     }
   }
 
@@ -75,12 +107,34 @@ public class FileSourceImpl<T> implements Source<T> {
 
   @Override
   public long getSize(Configuration configuration) {
-    try {
-      return SourceTargetHelper.getPathSize(configuration, path);
-    } catch (IOException e) {
-      LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
-      throw new IllegalStateException("Failed to get the file size of:" + path, e);
+    long size = 0;
+    for (Path path : paths) {
+      try {
+        size += SourceTargetHelper.getPathSize(configuration, path);
+      } catch (IOException e) {
+        LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
+        throw new IllegalStateException("Failed to get the file size of:" + path, e);
+      }
+    }
+    return size;
+  }
+
+  protected Iterable<T> read(Configuration conf, FileReaderFactory<T> readerFactory)
+      throws IOException {
+    List<Iterable<T>> iterables = Lists.newArrayList();
+    for (Path path : paths) {
+      FileSystem fs = path.getFileSystem(conf);
+      iterables.add(CompositePathIterable.create(fs, path, readerFactory));
+    }
+    return Iterables.concat(iterables);
+  }
+
+  /* Retain string format for single-path sources */
+  protected String pathsAsString() {
+    if (paths.size() == 1) {
+      return paths.get(0).toString();
     }
+    return paths.toString();
   }
 
   @Override
@@ -89,16 +143,16 @@ public class FileSourceImpl<T> implements Source<T> {
       return false;
     }
     FileSourceImpl o = (FileSourceImpl) other;
-    return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle);
+    return ptype.equals(o.ptype) && paths.equals(o.paths) && inputBundle.equals(o.inputBundle);
   }
 
   @Override
   public int hashCode() {
-    return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode();
+    return new HashCodeBuilder().append(ptype).append(paths).append(inputBundle).toHashCode();
   }
 
   @Override
   public String toString() {
-    return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString();
+    return new StringBuilder().append(inputBundle.getName()).append("(").append(pathsAsString()).append(")").toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
index 295edb5..ba313a4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.io.impl;
 
+import java.util.List;
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.io.FormatBundle;
@@ -30,9 +31,17 @@ public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K,
V>> implem
     super(path, tableType, formatClass);
   }
 
+  public FileTableSourceImpl(List<Path> paths, PTableType<K, V> tableType, Class<?
extends FileInputFormat> formatClass) {
+    super(paths, tableType, formatClass);
+  }
+
   public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle)
{
     super(path, tableType, bundle);
   }
+
+  public FileTableSourceImpl(List<Path> paths, PTableType<K, V> tableType, FormatBundle
bundle) {
+    super(paths, tableType, bundle);
+  }
   
   @Override
   public PTableType<K, V> getTableType() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
index 8fac4ae..9e6edc8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileSource.java
@@ -19,12 +19,11 @@ package org.apache.crunch.io.seq;
 
 import java.io.IOException;
 
-import org.apache.crunch.io.CompositePathIterable;
+import java.util.List;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
@@ -34,14 +33,17 @@ public class SeqFileSource<T> extends FileSourceImpl<T> implements
ReadableSourc
     super(path, ptype, SequenceFileInputFormat.class);
   }
 
+  public SeqFileSource(List<Path> paths, PType<T> ptype) {
+    super(paths, ptype, SequenceFileInputFormat.class);
+  }
+
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new SeqFileReaderFactory<T>(ptype));
+    return read(conf, new SeqFileReaderFactory<T>(ptype));
   }
 
   @Override
   public String toString() {
-    return "SeqFile(" + path.toString() + ")";
+    return "SeqFile(" + pathsAsString() + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
index 7a63272..cecafeb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/seq/SeqFileTableSource.java
@@ -19,13 +19,12 @@ package org.apache.crunch.io.seq;
 
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.crunch.Pair;
-import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 
@@ -43,15 +42,17 @@ public class SeqFileTableSource<K, V> extends FileTableSourceImpl<K,
V> implemen
     super(path, ptype, SequenceFileInputFormat.class);
   }
 
+  public SeqFileTableSource(List<Path> paths, PTableType<K, V> ptype) {
+    super(paths, ptype, SequenceFileInputFormat.class);
+  }
+
   @Override
   public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path,
-        new SeqFileReaderFactory<Pair<K, V>>(getTableType()));
+    return read(conf, new SeqFileReaderFactory<Pair<K, V>>(getTableType()));
   }
 
   @Override
   public String toString() {
-    return "SeqFile(" + path.toString() + ")";
+    return "SeqFile(" + pathsAsString() + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
index 40e2dbd..abef771 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -19,7 +19,7 @@ package org.apache.crunch.io.text;
 
 import java.io.IOException;
 
-import org.apache.crunch.io.CompositePathIterable;
+import java.util.List;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
@@ -64,14 +64,24 @@ public class NLineFileSource<T> extends FileSourceImpl<T>
implements ReadableSou
     super(path, ptype, getBundle(linesPerTask));
   }
 
+  /**
+   * Create a new {@code NLineFileSource} instance.
+   *
+   * @param paths The {@code Path}s to the input data
+   * @param ptype The PType to use for processing the data
+   * @param linesPerTask The number of lines from the input each map task will process
+   */
+  public NLineFileSource(List<Path> paths, PType<T> ptype, int linesPerTask)
{
+    super(paths, ptype, getBundle(linesPerTask));
+  }
+
   @Override
   public String toString() {
-    return "NLine(" + path + ")";
+    return "NLine(" + pathsAsString() + ")";
   }
   
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    return CompositePathIterable.create(path.getFileSystem(conf), path,
-        new TextFileReaderFactory<T>(LineParser.forType(ptype)));
+    return read(conf, new TextFileReaderFactory<T>(LineParser.forType(ptype)));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index e1fea6e..851d199 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -62,11 +62,17 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T>
{
 
     final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
     return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
+      boolean nextChecked = false;
       private String nextLine;
 
       @Override
       public boolean hasNext() {
+        if (nextChecked) {
+          return nextLine != null;
+        }
+
         try {
+          nextChecked = true;
           return (nextLine = reader.readLine()) != null;
         } catch (IOException e) {
           LOG.info("Exception reading text file stream", e);
@@ -76,6 +82,10 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T>
{
 
       @Override
       public T next() {
+        if (!nextChecked && !hasNext()) {
+          return null;
+        }
+        nextChecked = false;
         return parser.parse(nextLine);
       }
     });

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
index 026fca9..ca8cbaf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -19,7 +19,7 @@ package org.apache.crunch.io.text;
 
 import java.io.IOException;
 
-import org.apache.crunch.io.CompositePathIterable;
+import java.util.List;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
@@ -51,6 +51,10 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
     super(path, ptype, getInputFormat(path, ptype));
   }
 
+  public TextFileSource(List<Path> paths, PType<T> ptype) {
+    super(paths, ptype, getInputFormat(paths.get(0), ptype));
+  }
+
   @Override
   public long getSize(Configuration conf) {
     long sz = super.getSize(conf);
@@ -62,12 +66,11 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements
ReadableSour
 
   @Override
   public String toString() {
-    return "Text(" + path + ")";
+    return "Text(" + pathsAsString() + ")";
   }
 
   @Override
   public Iterable<T> read(Configuration conf) throws IOException {
-    return CompositePathIterable.create(path.getFileSystem(conf), path,
-        new TextFileReaderFactory<T>(LineParser.forType(ptype)));
+    return read(conf, new TextFileReaderFactory<T>(LineParser.forType(ptype)));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/38a97e54/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
index 94fc5fd..66b2e67 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTableSource.java
@@ -19,8 +19,8 @@ package org.apache.crunch.io.text;
 
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.crunch.Pair;
-import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileTableSourceImpl;
@@ -58,6 +58,10 @@ public class TextFileTableSource<K, V> extends FileTableSourceImpl<K,
V>
   public TextFileTableSource(Path path, PTableType<K, V> tableType) {
     this(path, tableType, "\t");
   }
+
+  public TextFileTableSource(List<Path> paths, PTableType<K, V> tableType) {
+    this(paths, tableType, "\t");
+  }
   
   public TextFileTableSource(String path, PTableType<K, V> tableType, String separator)
{
     this(new Path(path), tableType, separator);
@@ -68,14 +72,20 @@ public class TextFileTableSource<K, V> extends FileTableSourceImpl<K,
V>
     this.separator = separator;
   }
 
+  public TextFileTableSource(List<Path> paths, PTableType<K, V> tableType, String
separator) {
+    super(paths, tableType, getBundle(separator));
+    this.separator = separator;
+  }
+
   @Override
   public String toString() {
-    return "KeyValueText(" + path + ")";
+    return "KeyValueText(" + pathsAsString() + ")";
   }
 
   @Override
   public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
-    return CompositePathIterable.create(path.getFileSystem(conf), path,
-        new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(),
separator)));
+    return read(conf,
+        new TextFileReaderFactory<Pair<K, V>>(LineParser.forTableType(getTableType(),
+            separator)));
   }
 }


Mime
View raw message