crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-217: Ensure PipelineResult captures pipeline failures. Contributed by Joe Adler.
Date Mon, 10 Jun 2013 16:43:20 GMT
Updated Branches:
  refs/heads/master a6df0ccac -> b8ae33ddb


CRUNCH-217: Ensure PipelineResult captures pipeline failures. Contributed by Joe Adler.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b8ae33dd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b8ae33dd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b8ae33dd

Branch: refs/heads/master
Commit: b8ae33ddb7eae55626e718f5f2902565a2104e8c
Parents: a6df0cc
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jun 10 09:32:32 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jun 10 09:32:32 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/PipelineResult.java  | 15 +++++++-----
 .../org/apache/crunch/impl/mem/MemPipeline.java | 24 +++++++++----------
 .../apache/crunch/impl/mr/exec/MRExecutor.java  | 25 ++++++++++----------
 3 files changed, 34 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b8ae33dd/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
index 90b1067..71a05e2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -17,12 +17,11 @@
  */
 package org.apache.crunch;
 
-import java.util.List;
-
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Counters;
 
-import com.google.common.collect.ImmutableList;
+import java.util.List;
 
 /**
  * Container for the results of a call to {@code run} or {@code done} on the
@@ -58,16 +57,20 @@ public class PipelineResult {
     }
   }
 
-  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult>
of());
+  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult>
of(), PipelineExecution.Status.READY);
 
   private final List<StageResult> stageResults;
 
-  public PipelineResult(List<StageResult> stageResults) {
+  public PipelineExecution.Status status;
+
+  public PipelineResult(List<StageResult> stageResults, PipelineExecution.Status status)
{
     this.stageResults = ImmutableList.copyOf(stageResults);
+    this.status = status;
   }
 
   public boolean succeeded() {
-    return !stageResults.isEmpty();
+    // return !stageResults.isEmpty();
+    return this.status.equals(PipelineExecution.Status.SUCCEEDED);
   }
 
   public List<StageResult> getStageResults() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/b8ae33dd/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 80b0543..e2a2529 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -17,11 +17,9 @@
  */
 package org.apache.crunch.impl.mem;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericContainer;
@@ -38,7 +36,6 @@ import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
-import org.apache.crunch.Target.WriteMode;
 import org.apache.crunch.impl.mem.collect.MemCollection;
 import org.apache.crunch.impl.mem.collect.MemTable;
 import org.apache.crunch.io.At;
@@ -54,9 +51,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Counters;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class MemPipeline implements Pipeline {
 
@@ -175,7 +173,7 @@ public class MemPipeline implements Pipeline {
   public void write(PCollection<?> collection, Target target,
       Target.WriteMode writeMode) {
     target.handleExisting(writeMode, getConfiguration());
-    if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) {
+    if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) {
       throw new CrunchRuntimeException("Target " + target + " is already written in the current
run." +
           " Use WriteMode.APPEND in order to write additional data to it.");
     }
@@ -277,7 +275,8 @@ public class MemPipeline implements Pipeline {
 
       @Override
       public PipelineResult getResult() {
-        return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage",
COUNTERS)));
+        return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage",
COUNTERS)),
+            Status.SUCCEEDED);
       }
 
       @Override
@@ -289,7 +288,8 @@ public class MemPipeline implements Pipeline {
   @Override
   public PipelineResult run() {
     activeTargets.clear();
-    return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage",
COUNTERS)));
+    return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage",
COUNTERS)),
+        PipelineExecution.Status.SUCCEEDED);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/b8ae33dd/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 9318271..1e03ff2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -17,14 +17,8 @@
  */
 package org.apache.crunch.impl.mr.exec;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.PipelineResult;
@@ -38,8 +32,13 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Provides APIs for job control at runtime to clients.
@@ -143,12 +142,14 @@ public class MRExecutor implements MRPipelineExecution {
       }
 
       synchronized (this) {
-        result = new PipelineResult(stages);
         if (killSignal.getCount() == 0) {
           status.set(Status.KILLED);
+        } else if (!failures.isEmpty()) {
+          status.set(Status.FAILED);
         } else {
-          status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED);
+          status.set(Status.SUCCEEDED);
         }
+        result = new PipelineResult(stages, status.get());
       }
     } catch (InterruptedException e) {
       throw new AssertionError(e); // Nobody should interrupt us.


Mime
View raw message