crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject git commit: Crunch-254: Added access to the underlying JobId for MRPipeline jobs
Date Wed, 21 Aug 2013 01:06:49 GMT
Updated Branches:
  refs/heads/master bf1a094f8 -> c10996a4d


Crunch-254: Added access to the underlying JobId for MRPipeline jobs

Signed-off-by: Micah Whitacre <mkwhit@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c10996a4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c10996a4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c10996a4

Branch: refs/heads/master
Commit: c10996a4d995d72a0ae7dab78a2da4aca6a56af3
Parents: bf1a094
Author: Micah Whitacre <mkwhit@apache.org>
Authored: Mon Aug 19 21:17:06 2013 -0500
Committer: Micah Whitacre <mkwhit@apache.org>
Committed: Tue Aug 20 19:50:57 2013 -0500

----------------------------------------------------------------------
 .../src/it/java/org/apache/crunch/MRPipelineIT.java       |  7 ++++++-
 .../src/main/java/org/apache/crunch/PipelineResult.java   | 10 ++++++++++
 .../java/org/apache/crunch/impl/mr/exec/MRExecutor.java   |  2 +-
 3 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/c10996a4/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
index 7670e88..25c85c8 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.crunch.PipelineResult.StageResult;
 import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
@@ -68,7 +69,11 @@ public class MRPipelineIT implements Serializable {
     
     pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath());
     pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath());
-    pipeline.done();
+    PipelineResult result = pipeline.done();
+    for(StageResult stageResult : result.getStageResults()){
+      assertTrue(stageResult.getStageName().length() > 1);
+      assertTrue(stageResult.getStageId().length() > 1);
+    }
 
     // Verify that output from a single PGroupedTable can be sent to multiple collections
     assertTrue(new File(outputDirA, "part-r-00000").exists());

http://git-wip-us.apache.org/repos/asf/crunch/blob/c10996a4/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
index 74a073f..bd29999 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -38,10 +38,16 @@ public class PipelineResult {
   public static class StageResult {
 
     private final String stageName;
+    private final String stageId;
     private final Counters counters;
 
     public StageResult(String stageName, Counters counters) {
+      this(stageName, stageName, counters);
+    }
+
+    public StageResult(String stageName, String stageId, Counters counters){
       this.stageName = stageName;
+      this.stageId = stageId;
       this.counters = counters;
     }
 
@@ -49,6 +55,10 @@ public class PipelineResult {
       return stageName;
     }
 
+    public String getStageId(){
+      return stageId;
+    }
+
     /**
      * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2
      * (from a class to an interface) so user programs should avoid this method and use

http://git-wip-us.apache.org/repos/asf/crunch/blob/c10996a4/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 1e03ff2..e223e5f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -112,7 +112,7 @@ public class MRExecutor implements MRPipelineExecution {
       }
       List<PipelineResult.StageResult> stages = Lists.newArrayList();
       for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
+        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(),
job.getJob().getCounters()));
       }
 
       for (PCollectionImpl<?> c : outputTargets.keySet()) {


Mime
View raw message