crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-355: Make the sequence ids correct in the Crunch-generated job names
Date Thu, 29 May 2014 16:45:49 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 652963cfb -> fd9778c90


CRUNCH-355: Make the sequence ids correct in the Crunch-generated job names


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fd9778c9
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fd9778c9
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fd9778c9

Branch: refs/heads/apache-crunch-0.8
Commit: fd9778c908a0a192ffd5c64d4f49df65d025c93b
Parents: 652963c
Author: Josh Wills <jwills@apache.org>
Authored: Tue May 27 20:16:37 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu May 29 09:44:12 2014 -0700

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchControlledJob.java         | 16 +++++++---------
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java  |  3 +++
 .../apache/crunch/impl/mr/plan/JobNameBuilder.java  | 11 +++++++++--
 .../apache/crunch/impl/mr/plan/JobPrototype.java    | 13 +++++++++----
 .../lib/jobcontrol/CrunchJobControlTest.java        |  2 ++
 .../crunch/impl/mr/plan/JobNameBuilderTest.java     |  6 +++---
 6 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 5dbb43e..06d886d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.plan.JobNameBuilder;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
@@ -53,6 +54,8 @@ public class CrunchControlledJob implements MRJob {
 
   private final int jobID;
   private final Job job; // mapreduce job to be executed.
+  private final JobNameBuilder jobNameBuilder;
+
   // the jobs the current job depends on
   private final List<CrunchControlledJob> dependingJobs;
   private final Hook prepareHook;
@@ -79,9 +82,10 @@ public class CrunchControlledJob implements MRJob {
    * @param completionHook
    *          a piece of code that will run after this job gets completed.
    */
-  public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) {
+  public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Hook prepareHook,
Hook completionHook) {
     this.jobID = jobID;
     this.job = job;
+    this.jobNameBuilder = jobNameBuilder;
     this.dependingJobs = Lists.newArrayList();
     this.prepareHook = prepareHook;
     this.completionHook = completionHook;
@@ -118,14 +122,8 @@ public class CrunchControlledJob implements MRJob {
     return job.getJobName();
   }
 
-  /**
-   * Set the job name for this job.
-   *
-   * @param jobName
-   *          the job name
-   */
-  public void setJobName(String jobName) {
-    job.setJobName(jobName);
+  public void setJobSequence(int jobSequence) {
+    this.job.setJobName(jobNameBuilder.jobSequence(jobSequence).build());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index ce7a6d9..8a650c7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -52,6 +52,7 @@ public class CrunchJobControl {
 
   private final String groupName;
   private final int maxRunningJobs;
+  private int jobSequence = 1;
 
   /**
    * Construct a job control for a group of jobs.
@@ -198,6 +199,8 @@ public class CrunchJobControl {
       // stop submitting new jobs and wait until some running job completes.
       if (runningJobs.size() < maxRunningJobs) {
         // Submitting Job to Hadoop
+        nextJob.setJobSequence(jobSequence);
+        jobSequence++;
         nextJob.submit();
       }
       this.addToQueue(nextJob);

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
index 6fac1be..4a77231 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
  * Visitor that traverses the {@code DoNode} instances in a job and builds a
  * String that identifies the stages of the pipeline that belong to this job.
  */
-class JobNameBuilder {
+public class JobNameBuilder {
 
   private static final Joiner JOINER = Joiner.on("+");
   private static final Joiner CHILD_JOINER = Joiner.on("/");
@@ -36,6 +36,7 @@ class JobNameBuilder {
 
   private final String pipelineName;
   private final int jobID;
+  private int jobSequence;
   private final int numOfJobs;
   List<String> rootStack = Lists.newArrayList();
   private final int maxStackNameLength;
@@ -48,6 +49,11 @@ class JobNameBuilder {
         PlanningParameters.JOB_NAME_MAX_STACK_LENGTH, DEFAULT_JOB_NAME_MAX_STACK_LENGTH);
   }
 
+  public JobNameBuilder jobSequence(int jobSequence) {
+    this.jobSequence = jobSequence;
+    return this;
+  }
+
   public void visit(DoNode node) {
     visit(node, rootStack);
   }
@@ -84,10 +90,11 @@ class JobNameBuilder {
   }
 
   public String build() {
-    return String.format("%s: %s (%d/%d)",
+    return String.format("%s: %s ID=%d (%d/%d)",
         pipelineName,
         shortenRootStackName(JOINER.join(rootStack), maxStackNameLength),
         jobID,
+        jobSequence,
         numOfJobs);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index e7a1e17..41da5a6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -223,11 +223,12 @@ class JobPrototype {
       }
       job.setInputFormatClass(CrunchInputFormat.class);
     }
-    job.setJobName(createJobName(conf, pipeline.getName(), inputNodes, reduceNode, numOfJobs));
+    JobNameBuilder jobNameBuilder = createJobNameBuilder(conf, pipeline.getName(), inputNodes,
reduceNode, numOfJobs);
 
     return new CrunchControlledJob(
         jobID,
         job,
+        jobNameBuilder,
         new CrunchJobHooks.PrepareHook(job),
         new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(),
group == null));
   }
@@ -242,14 +243,18 @@ class JobPrototype {
     DistCache.write(conf, path, rtNodes);
   }
 
-  private String createJobName(
-      Configuration conf, String pipelineName, List<DoNode> mapNodes, DoNode reduceNode,
int numOfJobs) {
+  private JobNameBuilder createJobNameBuilder(
+      Configuration conf,
+      String pipelineName,
+      List<DoNode> mapNodes,
+      DoNode reduceNode,
+      int numOfJobs) {
     JobNameBuilder builder = new JobNameBuilder(conf, pipelineName, jobID, numOfJobs);
     builder.visit(mapNodes);
     if (reduceNode != null) {
       builder.visit(reduceNode);
     }
-    return builder.build();
+    return builder;
   }
 
   private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working)
{

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
index 562e99d..2c1f1be 100644
--- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
 
 import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.plan.JobNameBuilder;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -65,6 +66,7 @@ public class CrunchJobControlTest {
     CrunchControlledJob job = new CrunchControlledJob(
         jobID,
         mrJob,
+        new JobNameBuilder(mrJob.getConfiguration(), "test", 1, 1),
         mock(CrunchControlledJob.Hook.class),
         mock(CrunchControlledJob.Hook.class));
     return spy(job);

http://git-wip-us.apache.org/repos/asf/crunch/blob/fd9778c9/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
index eef318e..ee7b398 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -37,9 +37,9 @@ public class JobNameBuilderTest {
     DoNode doNode = createDoNode(nodeName);
     JobNameBuilder jobNameBuilder = new JobNameBuilder(CONF, pipelineName, 1, 1);
     jobNameBuilder.visit(Lists.newArrayList(doNode));
-    String jobName = jobNameBuilder.build();
+    String jobName = jobNameBuilder.jobSequence(1).build();
 
-    assertEquals(String.format("%s: %s (1/1)", pipelineName, nodeName), jobName);
+    assertEquals(String.format("%s: %s ID=1 (1/1)", pipelineName, nodeName), jobName);
   }
 
   @Test
@@ -49,7 +49,7 @@ public class JobNameBuilderTest {
     DoNode doNode = createDoNode(nodeName);
     JobNameBuilder jobNameBuilder = new JobNameBuilder(CONF, pipelineName, 1, 1);
     jobNameBuilder.visit(Lists.newArrayList(doNode));
-    String jobName = jobNameBuilder.build();
+    String jobName = jobNameBuilder.jobSequence(1).build();
 
     assertFalse(jobName.contains(nodeName)); // Tests that the very long node name was shorten
   }


Mime
View raw message