incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-128: Add an explicit dependency between the output of one MapReduce job and the start of another one for cases like mapside joins and total orderings, where we need a file to exist on the filesystem before another process takes advanta
Date Tue, 08 Jan 2013 04:28:09 GMT
Updated Branches:
  refs/heads/master 2bf556177 -> 2bc04f98c


CRUNCH-128: Add an explicit dependency between the output of one MapReduce job
and the start of another one for cases like mapside joins and total orderings,
where we need a file to exist on the filesystem before another process takes
advantage of it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2bc04f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2bc04f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2bc04f98

Branch: refs/heads/master
Commit: 2bc04f98c1b874f4039ff405b5fd50098bd67447
Parents: 2bf5561
Author: Josh Wills <jwills@apache.org>
Authored: Sun Dec 9 19:25:53 2012 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jan 7 19:06:35 2013 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/lib/join/MapsideJoinIT.java  |   56 +++++--
 .../main/java/org/apache/crunch/PCollection.java   |   34 ++++
 .../java/org/apache/crunch/ParallelDoOptions.java  |   62 ++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java    |    1 +
 .../crunch/impl/mem/collect/MemCollection.java     |   13 ++
 .../crunch/impl/mr/collect/DoCollectionImpl.java   |   11 ++-
 .../apache/crunch/impl/mr/collect/DoTableImpl.java |    8 +-
 .../crunch/impl/mr/collect/PCollectionImpl.java    |   35 ++++-
 .../apache/crunch/impl/mr/collect/PTableBase.java  |    5 +
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |  121 ++++++++++-----
 .../org/apache/crunch/io/ReadableSourceTarget.java |    1 +
 .../org/apache/crunch/lib/join/MapsideJoin.java    |   72 ++++++----
 .../crunch/materialize/MaterializableIterable.java |    9 +
 13 files changed, 341 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
index 9147baf..7d5d94d 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -29,7 +29,9 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
 import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
@@ -64,21 +66,33 @@ public class MapsideJoinIT {
   }
 
   private static class LineSplitter extends MapFn<String, Pair<Integer, String>>
{
-
     @Override
     public Pair<Integer, String> map(String input) {
       String[] fields = input.split("\\|");
       return Pair.of(Integer.parseInt(fields[0]), fields[1]);
     }
-
   }
 
+  private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
+    @Override
+    public String map(String v) {
+      return v.toUpperCase();
+    }
+  }
+  
+  private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>,
String> {
+    @Override
+    public String map(Pair<String, String> v) {
+      return v.toString();
+    }
+  }
+  
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
 
-  @Test(expected = CrunchRuntimeException.class)
-  public void testNonMapReducePipeline() {
-    runMapsideJoin(MemPipeline.getInstance());
+  @Test
+  public void testMapSideJoin_MemPipeline() {
+    runMapsideJoin(MemPipeline.getInstance(), true);
   }
 
   @Test
@@ -95,27 +109,39 @@ public class MapsideJoinIT {
     List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
 
     assertTrue(materializedJoin.isEmpty());
-
   }
 
   @Test
   public void testMapsideJoin() throws IOException {
-    runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()));
+    runMapsideJoin(new MRPipeline(MapsideJoinIT.class, tmpDir.getDefaultConfiguration()),
false);
   }
 
-  private void runMapsideJoin(Pipeline pipeline) {
+  private void runMapsideJoin(Pipeline pipeline, boolean inMemory) {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+    
+    PTable<Integer, String> custOrders = MapsideJoin.join(customerTable, orderTable)
+        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
 
-    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
orderTable);
+    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(),
orderTable.getPTableType());
+    
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(custOrders,
ORDER_TABLE);
 
     List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
-    expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes")));
-    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper")));
-    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger")));
-    expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush")));
-
-    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined.materialize());
+    expectedJoinResult.add(Pair.of(111, Pair.of("[John Doe,Corn flakes]", "CORN FLAKES")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet paper]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PAPER")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("[Jane Doe,Toilet plunger]", "TOILET PLUNGER")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("[Someone Else,Toilet brush]", "TOILET BRUSH")));
+    Iterable<Pair<Integer, Pair<String, String>>> iter = joined.materialize();
+    
+    PipelineResult res = pipeline.run();
+    if (!inMemory) {
+      assertEquals(2, res.getStageResults().size());
+    }
+     
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(iter);
     Collections.sort(joinedResultList);
 
     assertEquals(expectedJoinResult, joinedResultList);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index 00c300f..798c262 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -65,6 +65,23 @@ public interface PCollection<S> {
    * @return a new {@code PCollection}
    */
   <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T>
type);
+  
+  /**
+   * Applies the given doFn to the elements of this {@code PCollection} and
+   * returns a new {@code PCollection} that is the output of this processing.
+   *
+   * @param name
+   *          An identifier for this processing step, useful for debugging
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PType} of the resulting {@code PCollection}
+   * @param options
+   *          Optional information that is needed for certain pipeline operations
+   * @return a new {@code PCollection}
+   */
+  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T>
type,
+      ParallelDoOptions options);
 
   /**
    * Similar to the other {@code parallelDo} instance, but returns a
@@ -91,6 +108,23 @@ public interface PCollection<S> {
    * @return a new {@code PTable}
    */
   <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>>
doFn, PTableType<K, V> type);
+  
+  /**
+   * Similar to the other {@code parallelDo} instance, but returns a
+   * {@code PTable} instance instead of a {@code PCollection}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PTableType} of the resulting {@code PTable}
+   * @param options
+   *          Optional information that is needed for certain pipeline operations
+   * @return a new {@code PTable}
+   */
+  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>>
doFn, PTableType<K, V> type,
+      ParallelDoOptions options);
 
   /**
    * Write the contents of this {@code PCollection} to the given {@code Target},

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
new file mode 100644
index 0000000..2407b3a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/ParallelDoOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Container class that includes optional information about a {@code parallelDo} operation
+ * applied to a {@code PCollection}. Primarily used within the Crunch framework
+ * itself for certain types of advanced processing operations, such as in-memory joins
+ * that require reading a file from the filesystem into a {@code DoFn}.
+ */
+public class ParallelDoOptions {
+  private final Set<SourceTarget<?>> sourceTargets;
+  
+  private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) {
+    this.sourceTargets = sourceTargets;
+  }
+  
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return sourceTargets;
+  }
+  
+  public static Builder builder() {
+    return new Builder();
+  }
+  
+  public static class Builder {
+    private Set<SourceTarget<?>> sourceTargets;
+    
+    public Builder() {
+      this.sourceTargets = Sets.newHashSet();
+    }
+    
+    public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
+      Collections.addAll(this.sourceTargets, sourceTargets);
+      return this;
+    }
+    
+    public ParallelDoOptions build() {
+      return new ParallelDoOptions(sourceTargets);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 77c41ce..3e28a0c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -28,6 +28,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mem.collect.MemCollection;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 35f64ce..ffc38ae 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -31,6 +31,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
@@ -95,6 +96,12 @@ public class MemCollection<S> implements PCollection<S> {
 
   @Override
   public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T>
type) {
+    return parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
+  }
+  
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T>
type,
+      ParallelDoOptions options) {
     InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
     doFn.initialize();
     for (S s : collect) {
@@ -111,6 +118,12 @@ public class MemCollection<S> implements PCollection<S> {
 
   @Override
   public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>>
doFn, PTableType<K, V> type) {
+    return parallelDo(name, doFn, type, ParallelDoOptions.builder().build());
+  }
+  
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>>
doFn, PTableType<K, V> type,
+      ParallelDoOptions options) {
     InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
     doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
     doFn.initialize();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 1f4fea2..7b8f2ea 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -18,12 +18,16 @@
 package org.apache.crunch.impl.mr.collect;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.crunch.DoFn;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 public class DoCollectionImpl<S> extends PCollectionImpl<S> {
 
@@ -32,7 +36,12 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S>
{
   private final PType<S> ntype;
 
   <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S>
fn, PType<S> ntype) {
-    super(name);
+    this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
+  }
+  
+  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S>
fn, PType<S> ntype,
+      ParallelDoOptions options) {
+    super(name, options);
     this.parent = (PCollectionImpl<Object>) parent;
     this.fn = (DoFn<Object, S>) fn;
     this.ntype = ntype;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 1d19580..176643b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -23,6 +23,7 @@ import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -36,7 +37,12 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements
PTable<K, V>
   private final PTableType<K, V> type;
 
   <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K,
V>> fn, PTableType<K, V> ntype) {
-    super(name);
+    this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
+  }
+  
+  <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K,
V>> fn, PTableType<K, V> ntype,
+      ParallelDoOptions options) {
+    super(name, options);
     this.parent = parent;
     this.fn = fn;
     this.type = ntype;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 79fe72b..8ad6692 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,6 +31,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
@@ -43,6 +45,7 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public abstract class PCollectionImpl<S> implements PCollection<S> {
 
@@ -51,9 +54,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
   private final String name;
   protected MRPipeline pipeline;
   private SourceTarget<S> materializedAt;
-
+  private final ParallelDoOptions options;
+  
   public PCollectionImpl(String name) {
+    this(name, ParallelDoOptions.builder().build());
+  }
+  
+  public PCollectionImpl(String name, ParallelDoOptions options) {
     this.name = name;
+    this.options = options;
   }
 
   @Override
@@ -86,7 +95,13 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
   public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T>
type) {
     return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
   }
-
+  
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T>
type,
+      ParallelDoOptions options) {
+    return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type, options);
+  }
+  
   @Override
   public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K,
V> type) {
     MRPipeline pipeline = (MRPipeline) getPipeline();
@@ -98,6 +113,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
     return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
   }
 
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>>
fn, PTableType<K, V> type,
+      ParallelDoOptions options) {
+    return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type, options);
+  }
+
   public PCollection<S> write(Target target) {
     if (materializedAt != null) {
       getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()),
target);
@@ -194,7 +215,15 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
     }
     return pipeline;
   }
-
+  
+  public Set<SourceTarget<?>> getTargetDependencies() {
+    Set<SourceTarget<?>> targetDeps = options.getSourceTargets();
+    for (PCollectionImpl<?> parent : getParents()) {
+      targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
+    }
+    return targetDeps;
+  }
+  
   public int getDepth() {
     int parentMax = 0;
     for (PCollectionImpl parent : getParents()) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 03c2fdc..69ea8a3 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -27,6 +27,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Target;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
@@ -44,6 +45,10 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K,
V>> implements P
     super(name);
   }
 
+  public PTableBase(String name, ParallelDoOptions options) {
+    super(name, options);
+  }
+  
   public PType<K> getKeyType() {
     return getPTableType().getKeyType();
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 7fe2809..3718ec2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -35,14 +36,23 @@ import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 public class MSCRPlanner {
 
+  private final MRPipeline pipeline;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
+
+  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>>
outputs) {
+    this.pipeline = pipeline;
+    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
+    this.outputs.putAll(outputs);
+  }
+
   // Used to ensure that we always build pipelines starting from the deepest
-  // outputs, which
-  // helps ensure that we handle intermediate outputs correctly.
+  // outputs, which helps ensure that we handle intermediate outputs correctly.
   private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new
Comparator<PCollectionImpl<?>>() {
     @Override
     public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
@@ -50,55 +60,88 @@ public class MSCRPlanner {
       if (cmp == 0) {
         // Ensure we don't throw away two output collections at the same depth.
         // Using the collection name would be nicer here, but names aren't
-        // necessarily unique
+        // necessarily unique.
         cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
       }
       return cmp;
     }
   };  
 
-  private final MRPipeline pipeline;
-  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
-
-  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>>
outputs) {
-    this.pipeline = pipeline;
-    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
-    this.outputs.putAll(outputs);
-  }
-
   public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException
{
-    // Walk the current plan tree and build a graph in which the vertices are
-    // sources, targets, and GBK operations.
-    GraphBuilder graphBuilder = new GraphBuilder();
-    for (PCollectionImpl<?> output : outputs.keySet()) {
-      graphBuilder.visitOutput(output);
+    Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
+    for (PCollectionImpl<?> pcollect : outputs.keySet()) {
+      targetDeps.put(pcollect, pcollect.getTargetDependencies());
     }
-    Graph baseGraph = graphBuilder.getGraph();
-    
-    // Create a new graph that splits up up dependent GBK nodes.
-    Graph graph = prepareFinalGraph(baseGraph);
-    
-    // Break the graph up into connected components.
-    List<List<Vertex>> components = graph.connectedComponents();
     
-
-    // For each component, we will create one or more job prototypes,
-    // depending on its profile.
-    // For dependency handling, we only need to care about which
-    // job prototype a particular GBK is assigned to.
     Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
-    for (List<Vertex> component : components) {
-      assignments.putAll(constructJobPrototypes(component));
+    Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
+    while (!targetDeps.isEmpty()) {
+      Set<Target> allTargets = Sets.newHashSet();
+      for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
+        allTargets.addAll(outputs.get(pcollect));
+      }
+      GraphBuilder graphBuilder = new GraphBuilder();
+      
+      // Walk the current plan tree and build a graph in which the vertices are
+      // sources, targets, and GBK operations.
+      Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
+      Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
+      for (PCollectionImpl<?> output : targetDeps.keySet()) {
+        if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
+          graphBuilder.visitOutput(output);
+          currentStage.add(output);
+        } else {
+          laterStage.add(output);
+        }
+      }
+      
+      Graph baseGraph = graphBuilder.getGraph();
+      
+      // Create a new graph that splits up up dependent GBK nodes.
+      Graph graph = prepareFinalGraph(baseGraph);
+      
+      // Break the graph up into connected components.
+      List<List<Vertex>> components = graph.connectedComponents();
+      
+      // For each component, we will create one or more job prototypes,
+      // depending on its profile.
+      // For dependency handling, we only need to care about which
+      // job prototype a particular GBK is assigned to.
+      for (List<Vertex> component : components) {
+        assignments.putAll(constructJobPrototypes(component));
+      }
+
+      // Add in the job dependency information here.
+      for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
+        JobPrototype current = e.getValue();
+        List<Vertex> parents = graph.getParents(e.getKey());
+        for (Vertex parent : parents) {
+          for (JobPrototype parentJobProto : assignments.get(parent)) {
+            current.addDependency(parentJobProto);
+          }
+        }
+      }
+      
+      // Add cross-stage dependencies.
+      for (PCollectionImpl<?> output : currentStage) {
+        Set<Target> targets = outputs.get(output);
+        Vertex vertex = graph.getVertexAt(output);
+        for (PCollectionImpl<?> later : laterStage) {
+          if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
+            protoDependency.put(later, vertex);
+          }
+        }
+        targetDeps.remove(output);
+      }
     }
     
-
-    // Add in the job dependency information here.
-    for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
-      JobPrototype current = e.getValue();
-      List<Vertex> parents = graph.getParents(e.getKey());
-      for (Vertex parent : parents) {
-        for (JobPrototype parentJobProto : assignments.get(parent)) {
-          current.addDependency(parentJobProto);
+    // Cross-job dependencies.
+    for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
+      Vertex d = new Vertex(pd.getKey());
+      Vertex dj = pd.getValue();
+      for (JobPrototype parent : assignments.get(dj)) {
+        for (JobPrototype child : assignments.get(d)) {
+          child.addDependency(parent);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
index 95c90aa..ac979c3 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.io;
 
 import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.fs.Path;
 
 /**
  * An interface that indicates that a {@code SourceTarget} instance can be read

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
index 1acbf2d..8116ea1 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -24,16 +24,18 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.impl.SourcePathTargetImpl;
+import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
 /**
@@ -64,37 +66,46 @@ public class MapsideJoin {
    * @return A table keyed on the join key, containing pairs of joined values
    */
   public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
left, PTable<K, V> right) {
-
-    if (!(right.getPipeline() instanceof MRPipeline)) {
-      throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce
context");
+    PTypeFamily tf = left.getTypeFamily();
+    Iterable<Pair<K, V>> iterable = right.materialize();
+
+    if (iterable instanceof MaterializableIterable) {
+      MaterializableIterable<Pair<K, V>> mi = (MaterializableIterable<Pair<K,
V>>) iterable;
+      MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(mi.getPath().toString(),
right.getPType());
+      ParallelDoOptions options = ParallelDoOptions.builder()
+          .sourceTargets(mi.getSourceTarget())
+          .build();
+      return left.parallelDo("mapjoin", mapJoinDoFn,
+          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())),
+          options);
+    } else { // in-memory pipeline
+      return left.parallelDo(new InMemoryJoinFn<K, U, V>(iterable),
+          tf.tableOf(left.getKeyType(), tf.pairs(left.getValueType(), right.getValueType())));
     }
+  }
 
-    MRPipeline pipeline = (MRPipeline) right.getPipeline();
-    pipeline.materialize(right);
-
-    // TODO Move necessary logic to MRPipeline so that we can theoretically
-    // optimize his by running the setup of multiple map-side joins concurrently
-    pipeline.run();
+  static class InMemoryJoinFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K,
Pair<U, V>>> {
 
-    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline.getMaterializeSourceTarget(right);
-    if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
-      throw new CrunchRuntimeException("Right-side contents can't be read from a path");
+    private Multimap<K, V> joinMap;
+    
+    public InMemoryJoinFn(Iterable<Pair<K, V>> iterable) {
+      joinMap = HashMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+    }
+    
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>>
emitter) {
+      K key = input.first();
+      U value = input.second();
+      for (V joinValue : joinMap.get(key)) {
+        Pair<U, V> valuePair = Pair.of(value, joinValue);
+        emitter.emit(Pair.of(key, valuePair));
+      }
     }
-
-    // Suppress warnings because we've just checked this cast via instanceof
-    @SuppressWarnings("unchecked")
-    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K,
V>>) readableSourceTarget;
-
-    Path path = sourcePathTarget.getPath();
-    DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
-
-    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.getName(),
right.getPType());
-    PTypeFamily typeFamily = left.getTypeFamily();
-    return left.parallelDo("mapjoin", mapJoinDoFn,
-        typeFamily.tableOf(left.getKeyType(), typeFamily.pairs(left.getValueType(), right.getValueType())));
-
   }
-
+  
   static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K,
Pair<U, V>>> {
 
     private String inputPath;
@@ -122,6 +133,11 @@ public class MapsideJoin {
     }
 
     @Override
+    public void configure(Configuration conf) {
+      DistributedCache.addCacheFile(new Path(inputPath).toUri(), conf);
+    }
+    
+    @Override
     public void initialize() {
       super.initialize();
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bc04f98/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index 2d6c573..0ed29e3 100644
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -24,7 +24,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.fs.Path;
 
 public class MaterializableIterable<E> implements Iterable<E> {
 
@@ -44,6 +46,13 @@ public class MaterializableIterable<E> implements Iterable<E>
{
     return sourceTarget;
   }
 
+  public Path getPath() {
+    if (sourceTarget instanceof PathTarget) {
+      return ((PathTarget) sourceTarget).getPath();
+    }
+    return null;
+  }
+  
   @Override
   public Iterator<E> iterator() {
     if (materialized == null) {


Mime
View raw message