crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-400: Add PipelineResult option to MaterializableIterable
Date Thu, 17 Jul 2014 18:56:22 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 59263aaf2 -> ded504eb1


CRUNCH-400: Add PipelineResult option to MaterializableIterable


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ded504eb
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ded504eb
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ded504eb

Branch: refs/heads/master
Commit: ded504eb133fa0814e2d90ff2a662e72a67e04bb
Parents: 59263aa
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jul 14 12:11:36 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Jul 17 11:49:04 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/MaterializeIT.java   | 13 ++++---
 .../materialize/MaterializableIterable.java     | 39 ++++++++++++++++++--
 2 files changed, 42 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ded504eb/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
index d064993..cb0f306 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -23,9 +23,11 @@ import static junit.framework.Assert.assertTrue;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Iterables;
 import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
@@ -104,8 +106,7 @@ public class MaterializeIT {
       throws IOException {
     String inputPath = tmpDir.copyResourceFileName("set1.txt");
     PCollection<String> empty = pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
-
-    assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
+    assertTrue(Iterables.isEmpty(empty.materialize()));
     pipeline.done();
   }
 
@@ -126,14 +127,14 @@ public class MaterializeIT {
   public void testMaterializeAvroPersonAndReflectsPair_GroupedTable() throws IOException
{
     Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
     Pipeline pipeline = new MRPipeline(MaterializeIT.class);
-    List<Pair<StringWrapper, Person>> pairList = Lists.newArrayList(pipeline
+    MaterializableIterable<Pair<StringWrapper, Person>> mi = (MaterializableIterable)
pipeline
         .readTextFile(tmpDir.copyResourceFileName("set1.txt"))
         .parallelDo(new StringToStringWrapperPersonPairMapFn(),
             Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)))
-        .materialize());
+        .materialize();
     
     // We just need to make sure this doesn't crash
-    assertEquals(4, pairList.size());
-
+    assertEquals(4, Iterables.size(mi));
+    assertTrue(mi.getPipelineResult().succeeded());
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ded504eb/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index aeb1fba..d00a4c1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -20,16 +20,22 @@ package org.apache.crunch.materialize;
 import java.io.IOException;
 import java.util.Iterator;
 
+import com.google.common.collect.Iterators;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * A reference to the materialized output of a {@code PCollection} created
+ * by a subclass of {@code DistributedPipeline}.
+ */
 public class MaterializableIterable<E> implements Iterable<E> {
 
   private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
@@ -37,6 +43,7 @@ public class MaterializableIterable<E> implements Iterable<E>
{
   private final Pipeline pipeline;
   private final ReadableSource<E> source;
   private Iterable<E> materialized;
+  private PipelineResult result;
 
   public MaterializableIterable(Pipeline pipeline, ReadableSource<E> source) {
     this.pipeline = pipeline;
@@ -44,14 +51,23 @@ public class MaterializableIterable<E> implements Iterable<E>
{
     this.materialized = null;
   }
 
+  /**
+   * Returns the backing {@code ReadableSource} for this instance.
+   */
   public ReadableSource<E> getSource() {
     return source;
   }
 
+  /**
+   * Indicates whether this instance is backed by a {@code SourceTarget}.
+   */
   public boolean isSourceTarget() {
     return (source instanceof SourceTarget);
   }
-  
+
+  /**
+   * Returns the {@code Path} that contains this data, or null if no such path exists.
+   */
   public Path getPath() {
     if (source instanceof FileSourceImpl) {
       return ((FileSourceImpl) source).getPath();
@@ -61,12 +77,27 @@ public class MaterializableIterable<E> implements Iterable<E>
{
     }
     return null;
   }
-  
+
+  /**
+   * Returns the {@code PipelineResult} that was generated by the Pipeline execution that
+   * created this data. This result will only be non-empty if an actual pipeline execution
was
+   * performed in order to generate this data, and it will only be non-null if this method
is
+   * called after the data from this Iterable is retrieved.
+   */
+  public PipelineResult getPipelineResult() {
+    return result;
+  }
+
   @Override
   public Iterator<E> iterator() {
     if (materialized == null) {
-      pipeline.run();
-      materialize();
+      this.result = pipeline.run();
+      if (result.succeeded()) {
+        materialize();
+      } else {
+        LOG.error("Pipeline run failed, returning empty iterator");
+        return Iterators.emptyIterator();
+      }
     }
     return materialized.iterator();
   }


Mime
View raw message