crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-536: Refactor CrunchControlledJob.Hook interface and make it client-accessible
Date Tue, 07 Jul 2015 19:57:34 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 1f5323bd1 -> 7f85ee581


CRUNCH-536: Refactor CrunchControlledJob.Hook interface and make it client-accessible


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7f85ee58
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7f85ee58
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7f85ee58

Branch: refs/heads/master
Commit: 7f85ee5816a19eca0e87ce503ea0b03ea294433c
Parents: 1f5323b
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jul 6 11:49:46 2015 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jul 6 11:49:46 2015 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CleanTextIT.java  | 22 ++++++++++-
 .../lib/jobcontrol/CrunchControlledJob.java     |  6 +--
 .../org/apache/crunch/impl/mr/MRPipeline.java   | 25 ++++++++++++
 .../crunch/impl/mr/exec/CrunchJobHooks.java     | 41 ++++++++++++--------
 .../crunch/impl/mr/plan/JobPrototype.java       | 25 ++++++++++--
 5 files changed, 94 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java b/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java
index 2f4004e..9d6f682 100644
--- a/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/CleanTextIT.java
@@ -20,9 +20,12 @@ package org.apache.crunch;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.TemporaryPath;
@@ -63,7 +66,11 @@ public class CleanTextIT {
   
   @Test
   public void testMapSideOutputs() throws Exception {
-    Pipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration());
+    MRPipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration());
+    JobHook prepareOne = new JobHook();
+    JobHook prepareTwo = new JobHook();
+    JobHook completed = new JobHook();
+    pipeline.addPrepareHook(prepareOne).addPrepareHook(prepareTwo).addCompletionHook(completed);
     String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
     PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
     
@@ -78,5 +85,18 @@ public class CleanTextIT {
     File cleanFile = new File(cso, "part-m-00000");
     List<String> lines = Files.readLines(cleanFile, Charset.defaultCharset());
     assertEquals(LINES_IN_SHAKES, lines.size());
+    assertEquals(1, prepareOne.called);
+    assertEquals(1, prepareTwo.called);
+    assertEquals(1, completed.called);
+  }
+
+  static class JobHook implements CrunchControlledJob.Hook {
+
+    int called = 0;
+
+    @Override
+    public void run(MRJob job) throws IOException {
+      called++;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index dceb217..e96b9bf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
 public class CrunchControlledJob implements MRJob {
 
   public static interface Hook {
-    public void run() throws IOException;
+    public void run(MRJob job) throws IOException;
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(CrunchControlledJob.class);
@@ -285,7 +285,7 @@ public class CrunchControlledJob implements MRJob {
       }
     }
     if (isCompleted()) {
-      completionHook.run();
+      completionHook.run(this);
       this.postHookEndTimeMsec = System.currentTimeMillis();
     }
   }
@@ -335,7 +335,7 @@ public class CrunchControlledJob implements MRJob {
   protected synchronized void submit() {
     try {
       this.preHookStartTimeMsec = System.currentTimeMillis();
-      prepareHook.run();
+      prepareHook.run(this);
       this.jobStartTimeMsec = System.currentTimeMillis();
       job.submit();
       this.state = State.RUNNING;

http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 6cd2809..440bec7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -23,10 +23,12 @@ import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.apache.crunch.CachingOptions;
@@ -34,6 +36,7 @@ import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.MRCollectionFactory;
@@ -57,6 +60,8 @@ public class MRPipeline extends DistributedPipeline {
   private static final Logger LOG = LoggerFactory.getLogger(MRPipeline.class);
 
   private final Class<?> jarClass;
+  private final List<CrunchControlledJob.Hook> prepareHooks;
+  private final List<CrunchControlledJob.Hook> completionHooks;
 
   /**
    * Instantiate with a default Configuration and name.
@@ -98,6 +103,26 @@ public class MRPipeline extends DistributedPipeline {
   public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
     super(name, conf, new MRCollectionFactory());
     this.jarClass = jarClass;
+    this.prepareHooks = Lists.newArrayList();
+    this.completionHooks = Lists.newArrayList();
+  }
+
+  public MRPipeline addPrepareHook(CrunchControlledJob.Hook hook) {
+    this.prepareHooks.add(hook);
+    return this;
+  }
+
+  public List<CrunchControlledJob.Hook> getPrepareHooks() {
+    return prepareHooks;
+  }
+
+  public MRPipeline addCompletionHook(CrunchControlledJob.Hook hook) {
+    this.completionHooks.add(hook);
+    return this;
+  }
+
+  public List<CrunchControlledJob.Hook> getCompletionHooks() {
+    return completionHooks;
   }
 
   public MRExecutor plan() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
index 6a15a0d..8af94c6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -18,9 +18,11 @@
 package org.apache.crunch.impl.mr.exec;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.PathTarget;
 import org.apache.hadoop.conf.Configuration;
@@ -33,19 +35,29 @@ public final class CrunchJobHooks {
 
   private CrunchJobHooks() {}
 
-  /** Creates missing input directories before job is submitted. */
-  public static final class PrepareHook implements CrunchControlledJob.Hook {
-    private final Job job;
+  public static final class CompositeHook implements CrunchControlledJob.Hook {
+
+    private List<CrunchControlledJob.Hook> hooks;
+
+    public CompositeHook(List<CrunchControlledJob.Hook> hooks) {
+      this.hooks = hooks;
+    }
 
-    public PrepareHook(Job job) {
-      this.job = job;
+    @Override
+    public void run(MRJob job) throws IOException {
+      for (CrunchControlledJob.Hook hook : hooks) {
+        hook.run(job);
+      }
     }
+  }
 
+  /** Creates missing input directories before job is submitted. */
+  public static final class PrepareHook implements CrunchControlledJob.Hook {
     @Override
-    public void run() throws IOException {
-      Configuration conf = job.getConfiguration();
+    public void run(MRJob job) throws IOException {
+      Configuration conf = job.getJob().getConfiguration();
       if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) {
-        Path[] inputPaths = FileInputFormat.getInputPaths(job);
+        Path[] inputPaths = FileInputFormat.getInputPaths(job.getJob());
         for (Path inputPath : inputPaths) {
           FileSystem fs = inputPath.getFileSystem(conf);
           if (!fs.exists(inputPath)) {
@@ -61,25 +73,20 @@ public final class CrunchJobHooks {
 
   /** Moving output files produced by the MapReduce job to specified directories. */
   public static final class CompletionHook implements CrunchControlledJob.Hook {
-    private final Job job;
     private final Path workingPath;
     private final Map<Integer, PathTarget> multiPaths;
-    private final boolean mapOnlyJob;
 
-    public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths,
-        boolean mapOnlyJob) {
-      this.job = job;
+    public CompletionHook(Path workingPath, Map<Integer, PathTarget> multiPaths) {
       this.workingPath = workingPath;
       this.multiPaths = multiPaths;
-      this.mapOnlyJob = mapOnlyJob;
     }
 
     @Override
-    public void run() throws IOException {
-      handleMultiPaths();
+    public void run(MRJob job) throws IOException {
+      handleMultiPaths(job.getJob());
     }
 
-    private synchronized void handleMultiPaths() throws IOException {
+    private synchronized void handleMultiPaths(Job job) throws IOException {
       try {
         if (job.isSuccessful()) {
           if (!multiPaths.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/7f85ee58/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 2863e00..d23de3b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -27,6 +27,7 @@ import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.collect.DoTable;
 import org.apache.crunch.impl.dist.collect.MRCollection;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
@@ -130,7 +131,7 @@ class JobPrototype {
   }
 
   public CrunchControlledJob getCrunchJob(
-      Class<?> jarClass, Configuration conf, Pipeline pipeline, int numOfJobs) throws
IOException {
+      Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws
IOException {
     if (job == null) {
       job = build(jarClass, conf, pipeline, numOfJobs);
       for (JobPrototype proto : dependencies) {
@@ -141,7 +142,7 @@ class JobPrototype {
   }
 
   private CrunchControlledJob build(
-      Class<?> jarClass, Configuration conf, Pipeline pipeline, int numOfJobs) throws
IOException {
+      Class<?> jarClass, Configuration conf, MRPipeline pipeline, int numOfJobs) throws
IOException {
     Job job = new Job(conf);
     conf = job.getConfiguration();
     conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
@@ -229,13 +230,29 @@ class JobPrototype {
     }
     JobNameBuilder jobNameBuilder = createJobNameBuilder(conf, pipeline.getName(), inputNodes,
reduceNode, numOfJobs);
 
+    CrunchControlledJob.Hook prepareHook = getHook(new CrunchJobHooks.PrepareHook(), pipeline.getPrepareHooks());
+    CrunchControlledJob.Hook completionHook = getHook(
+        new CrunchJobHooks.CompletionHook(outputPath, outputHandler.getMultiPaths()),
+        pipeline.getCompletionHooks());
     return new CrunchControlledJob(
         jobID,
         job,
         jobNameBuilder,
         allTargets,
-        new CrunchJobHooks.PrepareHook(job),
-        new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(),
group == null));
+        prepareHook,
+        completionHook);
+  }
+
+  private static CrunchControlledJob.Hook getHook(
+      CrunchControlledJob.Hook base,
+      List<CrunchControlledJob.Hook> optional) {
+    if (optional.isEmpty()) {
+      return base;
+    }
+    List<CrunchControlledJob.Hook> hooks = Lists.newArrayList();
+    hooks.add(base);
+    hooks.addAll(optional);
+    return new CrunchJobHooks.CompositeHook(hooks);
   }
 
   private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath,
NodeContext context)


Mime
View raw message