incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [5/5] git commit: Extract setup for materialize
Date Fri, 06 Jul 2012 16:43:44 GMT
Extract setup for materialize

Pull out the setup method for doing a materialize on a PCollection
re-using it for in-memory mapside joins.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/6d701c13
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/6d701c13
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/6d701c13

Branch: refs/heads/master
Commit: 6d701c13fde22a342508c11f0de351a689ea5752
Parents: 9367826
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Sun Jun 24 20:31:02 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Fri Jul 6 17:54:29 2012 +0200

----------------------------------------------------------------------
 .../com/cloudera/crunch/impl/mr/MRPipeline.java    |  163 +++++++++------
 .../cloudera/crunch/impl/mr/MRPipelineTest.java    |   60 ++++++
 2 files changed, 159 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/6d701c13/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
index 420e8dc..c8ba596 100644
--- a/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
+++ b/src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
@@ -59,9 +59,9 @@ import com.google.common.collect.Sets;
 public class MRPipeline implements Pipeline {
 
   private static final Log LOG = LogFactory.getLog(MRPipeline.class);
-  
+
   private static final Random RANDOM = new Random();
-  
+
   private final Class<?> jarClass;
   private final String name;
   private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
@@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline {
   public MRPipeline(Class<?> jarClass) throws IOException {
     this(jarClass, new Configuration());
   }
-  
-  public MRPipeline(Class<?> jarClass, String name){
+
+  public MRPipeline(Class<?> jarClass, String name) {
     this(jarClass, name, new Configuration());
   }
-  
+
   public MRPipeline(Class<?> jarClass, Configuration conf) {
-	  this(jarClass, jarClass.getName(), conf);
+    this(jarClass, jarClass.getName(), conf);
   }
-  
+
   public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
     this.jarClass = jarClass;
     this.name = name;
@@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline {
 
   @Override
   public void setConfiguration(Configuration conf) {
-	this.conf = conf;
+    this.conf = conf;
   }
-  
+
   @Override
   public PipelineResult run() {
     MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
@@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline {
         boolean materialized = false;
         for (Target t : outputTargets.get(c)) {
           if (!materialized && t instanceof Source) {
-           c.materializeAt((SourceTarget) t);
-           materialized = true;
+            c.materializeAt((SourceTarget) t);
+            materialized = true;
           }
         }
       }
@@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline {
     cleanup();
     return res;
   }
-  
+
   public <S> PCollection<S> read(Source<S> source) {
     return new InputCollection<S>(source, this);
   }
@@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline {
   @SuppressWarnings("unchecked")
   public void write(PCollection<?> pcollection, Target target) {
     if (pcollection instanceof PGroupedTableImpl) {
-      pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
+      pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
     } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable)
{
-      pcollection = pcollection.parallelDo("UnionCollectionWrapper",  
-    		  (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());	 
+      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
     }
     addOutput((PCollectionImpl<?>) pcollection, target);
   }
 
   private void addOutput(PCollectionImpl<?> impl, Target target) {
     if (!outputTargets.containsKey(impl)) {
-      outputTargets.put(impl, Sets.<Target>newHashSet());
+      outputTargets.put(impl, Sets.<Target> newHashSet());
     }
     outputTargets.get(impl).add(target);
   }
-  
+
   @Override
   public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-	  
-    if (pcollection instanceof UnionCollection) {
-    	pcollection = pcollection.parallelDo("UnionCollectionWrapper",  
-	        (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());	 
-	}  
-    PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
+
+    PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
+    ReadableSourceTarget<T> srcTarget = getMaterializeSourceTarget(pcollectionImpl);
+
+    MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
+    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
+      outputTargetsToMaterialize.put(pcollectionImpl, c);
+    }
+    return c;
+  }
+
+  /**
+   * Retrieve a ReadableSourceTarget that provides access to the contents of a
+   * {@link PCollection}. This is primarily intended as a helper method to
+   * {@link #materialize(PCollection)}. The underlying data of the
+   * ReadableSourceTarget may not be actually present until the pipeline is run.
+   * 
+   * @param pcollection
+   *          The collection for which the ReadableSourceTarget is to be
+   *          retrieved
+   * @return The ReadableSourceTarget
+   * @throws IllegalArgumentException
+   *           If no ReadableSourceTarget can be retrieved for the given
+   *           PCollection
+   */
+  public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T>
pcollection) {
+    PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
     SourceTarget<T> matTarget = impl.getMaterializedAt();
     if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
-      return new MaterializableIterable<T>(this, (ReadableSourceTarget<T>) matTarget);
+      return (ReadableSourceTarget<T>) matTarget;
+    }
+
+    ReadableSourceTarget<T> srcTarget = null;
+    if (outputTargets.containsKey(pcollection)) {
+      for (Target target : outputTargets.get(impl)) {
+        if (target instanceof ReadableSourceTarget) {
+          srcTarget = (ReadableSourceTarget<T>) target;
+          break;
+        }
+      }
     }
-    
-	ReadableSourceTarget<T> srcTarget = null;
-	if (outputTargets.containsKey(pcollection)) {
-	  for (Target target : outputTargets.get(impl)) {
-	    if (target instanceof ReadableSourceTarget) {
-		  srcTarget = (ReadableSourceTarget) target;
-		  break;
-	    }
-	  }
-	}
-	
-	if (srcTarget == null) {
-	  SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
-	  if (!(st instanceof ReadableSourceTarget)) {
-		throw new IllegalArgumentException("The PType for the given PCollection is not readable"
-		    + " and cannot be materialized");
-	  } else {
-		srcTarget = (ReadableSourceTarget) st;
-		addOutput(impl, srcTarget);
-	  }
-	}
-	
-	MaterializableIterable<T> c = new MaterializableIterable<T>(this, srcTarget);
-	outputTargetsToMaterialize.put(impl, c);
-	return c;
+
+    if (srcTarget == null) {
+      SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
+      if (!(st instanceof ReadableSourceTarget)) {
+        throw new IllegalArgumentException("The PType for the given PCollection is not readable"
+            + " and cannot be materialized");
+      } else {
+        srcTarget = (ReadableSourceTarget<T>) st;
+        addOutput(impl, srcTarget);
+      }
+    }
+
+    return srcTarget;
+  }
+
+  /**
+   * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections.
+   * @param pcollection The PCollection to be cast/transformed
+   * @return The PCollectionImpl representation
+   */
+  private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection)
{
+    PCollectionImpl<T> pcollectionImpl = null;
+    if (pcollection instanceof UnionCollection) {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    } else {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection;
+    }
+    return pcollectionImpl;
   }
 
   public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
-	return ptype.getDefaultFileSource(createTempPath());
+    return ptype.getDefaultFileSource(createTempPath());
   }
 
   public Path createTempPath() {
     tempFileIndex++;
     return new Path(tempDirectory, "p" + tempFileIndex);
   }
-  
+
   private static Path createTempDirectory(Configuration conf) {
     Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
-	try {
-	  FileSystem.get(conf).mkdirs(dir);
-	} catch (IOException e) {
-	  LOG.error("Exception creating job output directory", e);
-	  throw new RuntimeException(e);
-	}
+    try {
+      FileSystem.get(conf).mkdirs(dir);
+    } catch (IOException e) {
+      LOG.error("Exception creating job output directory", e);
+      throw new RuntimeException(e);
+    }
     return dir;
   }
-  
+
   @Override
   public <T> void writeTextFile(PCollection<T> pcollection, String pathName)
{
     // Ensure that this is a writable pcollection instance.
-    pcollection = pcollection.parallelDo("asText", IdentityFn.<T>getInstance(),
-        WritableTypeFamily.getInstance().as(pcollection.getPType()));
+    pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily
+        .getInstance().as(pcollection.getPType()));
     write(pcollection, At.textFile(pathName));
   }
 
@@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline {
       LOG.info("Exception during cleanup", e);
     }
   }
-  
+
   public int getNextAnonymousStageId() {
     return nextAnonymousStageId++;
   }
@@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline {
   public void enableDebug() {
     // Turn on Crunch runtime error catching.
     getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
-    
+
     // Write Hadoop's WARN logs to the console.
     Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch");
     Appender console = crunchInfoLogger.getAppender("A");
@@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline {
       LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs");
     }
   }
-  
+
   @Override
   public String getName() {
-	  return name;
+    return name;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/6d701c13/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java b/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
new file mode 100644
index 0000000..f265460
--- /dev/null
+++ b/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
@@ -0,0 +1,60 @@
+package com.cloudera.crunch.impl.mr;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.cloudera.crunch.SourceTarget;
+import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
+import com.cloudera.crunch.io.ReadableSourceTarget;
+import com.cloudera.crunch.types.avro.Avros;
+
+public class MRPipelineTest {
+
+  private MRPipeline pipeline;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = spy(new MRPipeline(MRPipelineTest.class));
+  }
+
+  @Test
+  public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
+    PCollectionImpl<String> materializedPcollection = mock(PCollectionImpl.class);
+    ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
+    when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
+
+    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(materializedPcollection));
+  }
+
+  @Test
+  public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
+
+    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
+    ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
+    when(pcollection.getPType()).thenReturn(Avros.strings());
+    doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+    when(pcollection.getMaterializedAt()).thenReturn(null);
+
+    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() {
+    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
+    SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class);
+    when(pcollection.getPType()).thenReturn(Avros.strings());
+    doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+    when(pcollection.getMaterializedAt()).thenReturn(null);
+
+    pipeline.getMaterializeSourceTarget(pcollection);
+  }
+
+}


Mime
View raw message