Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9720A17869 for ; Thu, 29 Jan 2015 17:51:25 +0000 (UTC) Received: (qmail 70993 invoked by uid 500); 29 Jan 2015 17:51:26 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 70957 invoked by uid 500); 29 Jan 2015 17:51:26 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 70948 invoked by uid 99); 29 Jan 2015 17:51:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jan 2015 17:51:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07541E055B; Thu, 29 Jan 2015 17:51:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <596908fc9ba84f49bcfe5690b8484084@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: crunch git commit: CRUNCH-493: Re-enable exception throwing on reading materialized PCollections after a pipeline failure Date: Thu, 29 Jan 2015 17:51:26 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master cbb1b7e75 -> 4f2b1f25f CRUNCH-493: Re-enable exception throwing on reading materialized PCollections after a pipeline failure Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4f2b1f25 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4f2b1f25 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4f2b1f25 Branch: refs/heads/master Commit: 4f2b1f25fe21aca7c2df4747e754862d113173be Parents: cbb1b7e Author: Josh Wills Authored: Wed Jan 28 12:41:03 2015 -0800 Committer: Josh Wills Committed: Thu Jan 29 09:15:36 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/MaterializeIT.java | 54 ++++++++++++++------ .../materialize/MaterializableIterable.java | 2 +- 2 files changed, 40 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java index cb0f306..7bc61df 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MaterializeIT.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.List; import com.google.common.collect.Iterables; -import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.materialize.MaterializableIterable; @@ -36,6 +35,7 @@ import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.hadoop.conf.Configuration; import org.junit.Assume; import org.junit.Rule; import org.junit.Test; @@ -70,27 +70,27 @@ public class MaterializeIT { } @Test - public void testMaterializeEmptyIntermediate_Writables() throws IOException { + public void testMaterializeEmptyIntermediate() throws IOException { runMaterializeEmptyIntermediate( - new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), - WritableTypeFamily.getInstance()); + new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration())); } @Test - public void testMaterializeEmptyIntermediate_Avro() throws IOException { - runMaterializeEmptyIntermediate( - new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration()), - AvroTypeFamily.getInstance()); + public void testMaterializeEmptyIntermediate_InMemory() throws IOException { + runMaterializeEmptyIntermediate(MemPipeline.getInstance()); } - @Test - public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException { - runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance()); + @Test(expected = CrunchRuntimeException.class) + public void testMaterializeFailure() throws IOException { + runMaterializeWithFailure( + new MRPipeline(MaterializeIT.class, tmpDir.getDefaultConfiguration())); } @Test - public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException { - runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance()); + public void testMaterializeNoFailure() throws IOException { + Configuration conf = tmpDir.getDefaultConfiguration(); + conf.setBoolean("crunch.empty.materialize.on.failure", true); + runMaterializeWithFailure(new MRPipeline(MaterializeIT.class, conf)); } public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { @@ -102,14 +102,38 @@ public class MaterializeIT { pipeline.done(); } - public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily) + public void runMaterializeEmptyIntermediate(Pipeline pipeline) throws IOException { String inputPath = tmpDir.copyResourceFileName("set1.txt"); - PCollection empty = pipeline.readTextFile(inputPath).filter(FilterFns.REJECT_ALL()); + PCollection empty = pipeline.readTextFile(inputPath).filter(new FilterAll(false)); assertTrue(Iterables.isEmpty(empty.materialize())); pipeline.done(); } + public void runMaterializeWithFailure(Pipeline pipeline) throws IOException { + String inputPath = tmpDir.copyResourceFileName("set1.txt"); + PCollection empty = pipeline.readTextFile(inputPath).filter(new FilterAll(true)); + empty.materialize().iterator(); + pipeline.done(); + } + + static class FilterAll extends FilterFn { + + private final boolean throwException; + + public FilterAll(boolean throwException) { + this.throwException = throwException; + } + + @Override + public boolean accept(T input) { + if (throwException) { + throw new RuntimeException("This is an exception"); + } + return false; + } + } + static class StringToStringWrapperPersonPairMapFn extends MapFn> { @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/4f2b1f25/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java index f83117f..232d0a1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java +++ b/crunch-core/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java @@ -92,7 +92,7 @@ public class MaterializableIterable implements Iterable { public Iterator iterator() { if (materialized == null) { this.result = pipeline.run(); - if (result.succeeded()) { + if (result.succeeded() || !pipeline.getConfiguration().getBoolean("crunch.empty.materialize.on.failure", false)) { materialize(); } else { LOG.error("Pipeline run failed, returning empty iterator");