crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: PipelineResult#succeeded interpretation
Date Wed, 21 Jan 2015 05:35:45 GMT
Hey Peter,

So I wrote this integration test to test out how Crunch-on-Spark handled
inter-job failures, and it seemed like things happened correctly when I
called a Spark job that had a failing function. However, we also have a
condition that a no-op job (i.e., calling Pipeline.done when there are no
targets to write out), will also return succeeded() == true. I'm wondering
if that is what is happening here-- are you calling run() at one point,
getting a failure in the PipelineResult that is returned, and then calling
done() and getting the dummy PipelineResult that always returns succeeded()
== true b/c it didn't do anything?

J

package org.apache.crunch;

import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.io.To;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.types.writable.Writables;
import org.junit.Rule;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class SparkFailureIT {
@Rule
public TemporaryPath tmpDir = new TemporaryPath();

@Test
public void testFailure() throws Exception {
Pipeline p = new SparkPipeline("local", "failure");
PCollection<String> shakes =
p.readTextFile(tmpDir.copyResourceFileName("shakes.txt"));
PCollection<String> lower = shakes.parallelDo(new FailureFn(),
Writables.strings());
lower.write(To.textFile(tmpDir.getPath("out")));
assertEquals(false, p.done().succeeded());
assertEquals(false, p.done().succeeded());
}

public static class FailureFn extends DoFn<String, String> {
@Override
public void process(String input, Emitter<String> emitter) {
throw new RuntimeException("Oh no");
}
}
}

On Tue, Jan 20, 2015 at 10:01 AM, Peter Dolan <peter@nunahealth.com> wrote:

> Thanks Josh!
>
> On Tue, Jan 20, 2015 at 9:58 AM, Josh Wills <jwills@cloudera.com> wrote:
>
>> Okay, created https://issues.apache.org/jira/browse/CRUNCH-488 to track
>> it. Should get a patch together by tmrw.
>>
>> J
>>
>> On Mon, Jan 19, 2015 at 4:57 PM, Peter Dolan <peter@nunahealth.com>
>> wrote:
>>
>>> So far I've only tried this in the SparkPipeline.  In MemPipeline the
>>> entire JVM dies, so we don't get to determine success or failure.
>>>
>>> On Mon, Jan 19, 2015 at 10:47 AM, Josh Wills <josh.wills@gmail.com>
>>> wrote:
>>>
>>>> No, that's not good, we should fix that. Is it only in the
>>>> SparkPipeline that the situation occurs?
>>>>
>>>> On Mon, Jan 19, 2015 at 8:28 AM, Peter Dolan <peter@nunahealth.com>
>>>> wrote:
>>>>
>>>>> Hi Crunchers,
>>>>>
>>>>> At Nuna we've been using Crunch extensively, and I'm really thrilled
>>>>> with it.  It's excellent.  There are of course some rough edges though.
>>>>>
>>>>> Today I ran into some exceptions being thrown in the Spark pipeline,
>>>>> and am curious why they weren't resulting in the PipelineResult reporting
>>>>> failure.  In particular, my spark pipeline (running with a local spark
>>>>> instance, that is with the spark master set to "local[16]") failed with
an
>>>>> IOException when the machine ran out of space in /tmp/.  The PipelineResult
>>>>> retrieved by Pipeline#done returned true from PipelineResult#succeeded.
>>>>>
>>>>> I've seen this in a couple other contexts, for example when a MapFn
>>>>> threw an exception within MapFn#map, which did not result in a false
>>>>> success value.
>>>>>
>>>>> Is this expected / intended behavior?  Should I be getting at the
>>>>> success or failure of the execution some other way?
>>>>>
>>>>> Thanks!
>>>>> - Peter
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message