crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: SequentialDo for failed Jobs: bug or feature?
Date Tue, 26 Jan 2016 20:14:32 GMT
No, that seems like a fail-- at least, the part where it fails to let you
decide (based on the status of the job) whether/how to execute the
PipelineCallable. This seems tied to the idea that the Pipeline interface
should give you insight into the PipelineResult object for any
job/target/etc through its API-- does that make sense?

J

On Tue, Jan 26, 2016 at 8:50 AM, Igor Bernstein <igorbernstein@spotify.com>
wrote:

> Hi all,
>
> I was hoping someone can clarify how to handle errors in SequentialDos. I
> was really surprised to learn that SequentialDos get called for failed
> Jobs. See example below.
>
> From what I can tell, this happens because
> CrunchJobControl#executeReadySeqDoFns runs all PipelineCallables whose
> Targets are not "unfinished". CrunchJobControl#getUnfinishedTargets defines
> unfinished Targets as Targets that are not produced by waiting, running or
> ready Jobs.  In other words: Targets that belong to failed Jobs are
> considered finished.  Is this intentional?
>
> I can see this being useful in some scenarios, where my PipelineCallable
> can recover from a Job, but I don't see a clear way for my PipelineCallable
> to detect upstream failure.
>
> Anyone know if this is a bug and failed targets should be considered as
> unfinished or is this feature and I'm missing a way for me to check for
> upstream failure?
>
> Thanks in advance!
>
>
> Example Code
> class Main extends Configured implements Tool {
>   @Override
>   public int run(String[] args) throws Exception {
>     MRPipeline pipeline = new MRPipeline(Main.class);
>     PCollection<String> lines =
> pipeline.read(From.textFile("strings.txt"));
>
>     lines
>         .parallelDo(new BrokenMapper(), Writables.strings())
>         .sequentialDo("input", new AfterBroken());
>
>     return 0;
>   }
>
>   static class BrokenMapper extends MapFn<String, String> {
>     @Override
>     public String map(String input) {
>       throw new RuntimeException("I'm broken");
>     }
>   }
>
>   static class AfterBroken extends PipelineCallable<Void> {
>     @Override
>     protected Void getOutput(Pipeline pipeline) {
>       return null;
>     }
>
>     @Override
>     public Status call() throws Exception {
>       System.out.println("I should not be called because I don't have a
> materialized collection");
>
>       PCollection<String> lines = (PCollection<String>)
> getOnlyPCollection();
>       // Surprise! collection hasn't been materialized
>       Iterable<String> materialize = lines.materialize();
>
>       return Status.SUCCESS;
>     }
>   }
>
>   public static void main(String[] args) throws Exception {
>     int rc = ToolRunner.run(new Main(), args);
>     System.exit(rc);
>   }
> }
>

Mime
View raw message