crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Micah Whitacre (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-405) Explore adding support for idempotent MRPipeline.plan()
Date Mon, 21 Jul 2014 14:18:38 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14068555#comment-14068555
] 

Micah Whitacre commented on CRUNCH-405:
---------------------------------------

Thinking through scenarios for how things should behave with moving materialized state to
the pipeline I figured an example might help...

If we have a pipeline that looked like the following:

{code}
pcoll1 = pipeline.read(...);
pcoll2 = pcoll1.map(...);
pcoll3 = pcoll2.map(...);
pipeline.write(pcoll1);
pipeline.write(pcoll2);
pcoll3.materialize();
pipeline.run(); //generates the two outputs and materialized
pipeline.run(); //would do nothing because outputTargets are cleared on the previous run and
also nothing gets picked for materialization
{code}

if we changed the pipeline slightly...

{code}
pcoll1 = pipeline.read(...);
pcoll2 = pcoll1.map(...);
pcoll3 = pcoll2.map(...);
pipeline.write(pcoll1);
pipeline.write(pcoll2);
pcoll3.materialize();
pipeline.run(); //generates the two outputs and materialized
pcoll3.write(...);
pipeline.run(); //would only generate one target for pcoll3 but also wouldn't utilize stored
pcoll1 and pcoll2.
{code}

With the change to make the plan idempotent it gets more complex with the following:

{code}
pcoll1 = pipeline.read(...);
pcoll2 = pcoll1.map(...);
pcoll3 = pcoll2.map(...);
pipeline.write(pcoll1);
pipeline.write(pcoll2);
pcoll3.materialize();
plan1 = pipeline.plan(); //generates the two outputs and materialized
pcoll3.write(...);
plan2 = pipeline.plan(); //generates the 3 targets + materialize
plan2.execute();
plan1.execute();
{code}

While someone could do this I'm not sure it is a valid workflow.  Specifically in this case
the output targets are not cleared (e.g. done in runAsync()) and the runs will conflict over
who generates the targets.  I believe this is the scenario where you were concerned and stated
the above...

{quote}
We might need some sync logic in there to make sure two identical plans weren't executed simultaneously--
there would need to be a way for the execution of one plan to invalidate the execution of
any others that were created.
{quote}

So what would be the preferred action?  Should executing plan1 fail?  Should it attempt and
both fail when conflict over the targets being created?

I'm toying with a couple of options...

* The original patch had the addition of a method to MRPipeline.  Instead of returning an
executable MRExecutor it could return a no-op so that plan could never be executed.  Only
on a legitimate run/plan will update the state and prevent the duplication.
{code}
public MRExecutor plan(boolean dryRun)
{code}
* Add a "version" to the pipeline which gets modified for each write/materialize.  Each execution
then marks on the pipeline what version it executed for.  We could then add logic to invalidate
lower version runs that might be duplicates or stale plans.

I'm not really sold on either and for sure I think there are flaws in the last approach and
not fully baked.

Also just checking but what are our thread safety guarantees around MRPipeline/DistributedPipeline?
 I assume they should be marked as not thread safe but wasn't sure if there were use cases
I might have missed where they should be considered otherwise.






> Explore adding support for idempotent MRPipeline.plan()
> -------------------------------------------------------
>
>                 Key: CRUNCH-405
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-405
>             Project: Crunch
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Micah Whitacre
>            Assignee: Micah Whitacre
>         Attachments: CRUNCH-405_v1.patch
>
>
> Talking through a use case with a consumer, they were interested in having the ability
to run the MRPipeline.plan() method one to many times prior to ever calling the Pipeline.run/done
methods.  The reason for this was they were looking at pulling information off the MRExecutor
to tweak settings inside of their DoFns.
> Currently the MRPipeline implementation however does not have an idempotent plan() method
as it alters the state of internal values therefore affecting the full run once done() is
called.  
> It would be nice if we added an idempotent plan() method that could be gather this information
or perhaps a reset option.  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message