crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-247: Enable the planner to take advantage of to-be-materialized outputs during job planning.
Date Wed, 07 Aug 2013 03:51:49 GMT
Updated Branches:
  refs/heads/master 92ea0592f -> 98458852a


CRUNCH-247: Enable the planner to take advantage of to-be-materialized outputs during
job planning.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/98458852
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/98458852
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/98458852

Branch: refs/heads/master
Commit: 98458852a7c6d774de7716eef855467c0df22c87
Parents: 92ea059
Author: Josh Wills <jwills@apache.org>
Authored: Tue Aug 6 15:25:22 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Aug 6 20:50:05 2013 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/DependentSourcesIT.java   | 76 ++++++++++++++++++++
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  9 ++-
 2 files changed, 84 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/98458852/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
new file mode 100644
index 0000000..36bd7a7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+
+import java.util.List;
+
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.MRPipelineExecution;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class DependentSourcesIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testRun() throws Exception {
+    run(new MRPipeline(DependentSourcesIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourcePath("shakes.txt"),
+        tmpDir.getFileName("out"));
+  }
+  
+  public static void run(MRPipeline p, Path inputPath, String out) throws Exception {
+     PCollection<String> in = p.read(From.textFile(inputPath));
+     PTable<String, String> op = in.parallelDo("op1", new DoFn<String, Pair<String,
String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter)
{
+        if (input.length() > 5) {
+          emitter.emit(Pair.of(input.substring(0, 3), input));
+        }
+      } 
+     }, tableOf(strings(), strings()));
+     
+     SourceTarget src = (SourceTarget)((MaterializableIterable<Pair<String, String>>)
op.materialize()).getSource();
+     
+     op = op.parallelDo("op2", IdentityFn.<Pair<String,String>>getInstance(),
tableOf(strings(), strings()),
+         ParallelDoOptions.builder().sourceTargets(src).build());
+     
+     PCollection<String> output = op.values();
+     output.write(To.textFile(out));
+     MRPipelineExecution exec = p.runAsync();
+     exec.waitUntilDone();
+     List<MRJob> jobs = exec.getJobs();
+     Assert.assertEquals(2, jobs.size());
+     Assert.assertEquals(0, jobs.get(0).getJob().getNumReduceTasks());
+     Assert.assertEquals(0, jobs.get(1).getJob().getNumReduceTasks());     
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/98458852/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index b5b37d7..5ad5ca1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -136,8 +136,15 @@ public class MSCRPlanner {
       }
       assignments.putAll(newAssignments);
       
-      // Remove completed outputs.
+      // Remove completed outputs and mark materialized output locations
+      // for subsequent job processing.
       for (PCollectionImpl<?> output : currentStage) {
+        if (toMaterialize.containsKey(output)) {
+          MaterializableIterable mi = toMaterialize.get(output);
+          if (mi.isSourceTarget()) {
+            output.materializeAt((SourceTarget) mi.getSource());
+          }
+        }
         targetDeps.remove(output);
       }
     }


Mime
View raw message