incubator-crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <>
Subject Re: Checkpointing in pipelines
Date Thu, 06 Sep 2012 14:18:48 GMT
Hi Josh,

The last thing I would be doing after completing a trans-atlantic
flight is checking developer mailing lists ;-)

What you're talking about (having a kind of rollback for job failures
somewhere along the pipeline) could be facilitated with what I was
talking about here, but it's not what I was trying to accomplish (I
think you realize that, but I'm just making sure). However, it does
kind of show that the name "checkpoint" isn't that descriptive for the
specific use case that I was talking about (which is what I was a bit
worried about).

To clarify, I'm talking about making it possible to have specify that
a node in the execution graph of the pipeline shouldn't be merged in
between two other nodes (for example, an output or a GBK). The
specific use case that I'm going for is customizing the execution plan
for performance, and not for failure recovery.

I think we're on the same page here, but just referring to two
different use cases, right?

- Gabriel

On Thu, Sep 6, 2012 at 4:00 PM, Josh Wills <> wrote:
> I grok the concept and see the use case, but I was expecting that this
> email was going to be about checkpointing in the sense of having Crunch
> save state about the intermediate outputs of a processing pipeline and then
> supporting the ability to restart a failed pipeline from a checkpointed
> stage-- does that notion line up with what you had in mind here, or am I
> just sleep deprived?
> Josh, who just arrived in London
> On Wed, Sep 5, 2012 at 9:16 PM, Gabriel Reid <> wrote:
>> Hi guys,
>> In some instances, we want to do some kind of iterative processing in
>> Crunch, and run the same (or a similar) DoFn on the same PCollection
>> multiple times.
>> For example, let's say we've got a PCollection of "grid" objects, and we
>> want to iteratively divide each of these grids into four sub-grids, leading
>> to exponential growth of the data. The naive way to do this would be to do
>> the following:
>> PCollection<Grid> grids = …;
>> for (…){
>>    grids = grids.parallelDo(new SubdivideFn());
>> }
>> However, the above code would be optimized into a single string of DoFns,
>> and not increasing the number of mappers we've got per iteration, which of
>> course wouldn't work well with the exponential growth of data.
>> The current way of getting around this is to add a call to
>> materialize().iterator() on the PCollection in each iteration (this is also
>> done in the PageRankIT integration test).
>> What I propose is adding a "checkpoint" method to PCollection to signify
>> that this should be an actual step in processing. This could work as
>> follows:
>> PCollection<Grid> grids = …;
>> for (…){
>>    grids = grids.parallelDo(new SubdivideFn()).checkpoint();
>> }
>> In the short term this could even be implemented as just a call to
>> materialize().iterator(), but putting encapsulating it in a method like
>> this would allow us to work more efficiently with it in the future,
>> especially once CRUNCH-34 is merged.
>> Any thoughts on this? The actual name of the method is my biggest concern,
>> I'm not sure if "checkpoint" is the best name for it, but I can't think of
>> anything better at the moment.
>> - Gabriel
> --
> Director of Data Science
> Cloudera <>
> Twitter: @josh_wills <>

View raw message