crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-342: Add start and end time info to StageResult for both the job itself and the pre and post-job hooks.
Date Fri, 21 Feb 2014 17:13:01 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 42bbabd08 -> 2c963c6ba


CRUNCH-342: Add start and end time info to StageResult for both the job itself
and the pre and post-job hooks.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2c963c6b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2c963c6b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2c963c6b

Branch: refs/heads/apache-crunch-0.8
Commit: 2c963c6baf594db5b96de632e643c22c26501c17
Parents: 42bbabd
Author: Josh Wills <jwills@apache.org>
Authored: Thu Feb 13 18:08:36 2014 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Fri Feb 21 09:12:52 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/PipelineResult.java  | 48 ++++++++++++++++++--
 .../lib/jobcontrol/CrunchControlledJob.java     | 24 ++++++++++
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  3 +-
 .../apache/crunch/impl/spark/SparkRuntime.java  |  4 +-
 4 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2c963c6b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
index c1ecdd3..5325bf3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -41,26 +41,68 @@ public class PipelineResult {
     private final String stageName;
     private final String stageId;
     private final Counters counters;
+    private final long startTimeMsec;
+    private final long jobStartTimeMsec;
+    private final long jobEndTimeMsec;
+    private final long endTimeMsec;
 
     public StageResult(String stageName, Counters counters) {
-      this(stageName, stageName, counters);
+      this(stageName, counters, System.currentTimeMillis(), System.currentTimeMillis());
     }
 
-    public StageResult(String stageName, String stageId, Counters counters) {
+    public StageResult(String stageName, Counters counters, long startTimeMsec, long endTimeMsec)
{
+      this(stageName, stageName, counters, startTimeMsec, startTimeMsec, endTimeMsec, endTimeMsec);
+    }
+
+    public StageResult(String stageName, String stageId, Counters counters, long startTimeMsec,
+        long jobStartTimeMsec, long jobEndTimeMsec, long endTimeMsec) {
       this.stageName = stageName;
       this.stageId = stageId;
       this.counters = counters;
+      this.startTimeMsec = startTimeMsec;
+      this.jobStartTimeMsec = jobStartTimeMsec;
+      this.jobEndTimeMsec = jobEndTimeMsec;
+      this.endTimeMsec = endTimeMsec;
     }
 
     public String getStageName() {
       return stageName;
     }
 
-    public String getStageId(){
+    public String getStageId() {
       return stageId;
     }
 
     /**
+     * @return the overall start time for this stage, that is, the time at which any pre-job
hooks were
+     * started.
+     */
+    public long getStartTimeMsec() {
+      return startTimeMsec;
+    }
+
+    /**
+     * @return the time that the work for this stage was submitted to the cluster for execution,
if applicable.
+     */
+    public long getJobStartTimeMsec() {
+      return jobStartTimeMsec;
+    }
+
+    /**
+     * @return the time that the work for this stage finished processing on the cluster,
if applicable.
+     */
+    public long getJobEndTimeMsec() {
+      return jobEndTimeMsec;
+    }
+
+    /**
+     * @return the overall end time for this stage, that is, the time at which any post-job
hooks completed.
+     */
+    public long getEndTimeMsec() {
+      return endTimeMsec;
+    }
+
+    /**
      * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2
      * (from a class to an interface) so user programs should avoid this method and use
      * {@link #getCounterNames()}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c963c6b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index aaf9f04..5dbb43e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -62,6 +62,10 @@ public class CrunchControlledJob implements MRJob {
   private String message;
   private String lastKnownProgress;
   private Counters counters;
+  private long preHookStartTimeMsec;
+  private long jobStartTimeMsec;
+  private long jobEndTimeMsec;
+  private long postHookEndTimeMsec;
 
   /**
    * Construct a job.
@@ -138,6 +142,22 @@ public class CrunchControlledJob implements MRJob {
     return this.job.getJobID();
   }
 
+  public long getStartTimeMsec() {
+    return preHookStartTimeMsec;
+  }
+
+  public long getJobStartTimeMsec() {
+    return jobStartTimeMsec;
+  }
+
+  public long getJobEndTimeMsec() {
+    return jobEndTimeMsec;
+  }
+
+  public long getEndTimeMsec() {
+    return postHookEndTimeMsec;
+  }
+
   public Counters getCounters() {
     return counters;
   }
@@ -231,6 +251,7 @@ public class CrunchControlledJob implements MRJob {
   private void checkRunningState() throws IOException, InterruptedException {
     try {
       if (job.isComplete()) {
+        this.jobEndTimeMsec = System.currentTimeMillis();
         this.counters = job.getCounters();
         if (job.isSuccessful()) {
           this.state = State.SUCCESS;
@@ -256,6 +277,7 @@ public class CrunchControlledJob implements MRJob {
     }
     if (isCompleted()) {
       completionHook.run();
+      this.postHookEndTimeMsec = System.currentTimeMillis();
     }
   }
 
@@ -303,7 +325,9 @@ public class CrunchControlledJob implements MRJob {
    */
   protected synchronized void submit() {
     try {
+      this.preHookStartTimeMsec = System.currentTimeMillis();
       prepareHook.run();
+      this.jobStartTimeMsec = System.currentTimeMillis();
       job.submit();
       this.state = State.RUNNING;
       LOG.info("Running job \"" + getJobName() + "\"");

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c963c6b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index ce6fffa..1137498 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -123,7 +123,8 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements
MRPipe
       }
       List<PipelineResult.StageResult> stages = Lists.newArrayList();
       for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(),
job.getCounters()));
+        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(),
job.getCounters(),
+            job.getStartTimeMsec(), job.getJobStartTimeMsec(), job.getJobEndTimeMsec(), job.getEndTimeMsec()));
       }
 
       for (PCollectionImpl<?> c : outputTargets.keySet()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c963c6b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 99268cc..ecc7023 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -198,6 +198,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult>
implements Pipe
 
   private void monitorLoop() {
     status.set(Status.RUNNING);
+    long start = System.currentTimeMillis();
     Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.<PCollectionImpl<?>,
PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR);
     for (PCollectionImpl<?> pcollect : outputTargets.keySet()) {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
@@ -280,7 +281,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult>
implements Pipe
     }
     if (status.get() != Status.FAILED || status.get() != Status.KILLED) {
       status.set(Status.SUCCEEDED);
-      result = new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("Spark",
null)),
+      result = new PipelineResult(
+          ImmutableList.of(new PipelineResult.StageResult("Spark", null, start, System.currentTimeMillis())),
           Status.SUCCEEDED);
       set(result);
     } else {


Mime
View raw message