crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gabriel Reid (JIRA)" <>
Subject [jira] [Commented] (CRUNCH-222) Planner should choose to run DoFns on Map or Reduce side depending on data size
Date Wed, 19 Jun 2013 03:31:20 GMT


Gabriel Reid commented on CRUNCH-222:

I think that this could also be somewhat resolved by some of the functionality being discussed
in CRUNCH-218, specifically around forcing a given PCollection to be checkpointed to disk.
The same kind of thing could be done with the state of the PCollection as it is after MyDoFn,
and I think it would have the same functionality as what you're describing here.

I'm a little bit uneasy about shaping the workflows too much based on size estimates, mostly
because I usually forget to implement DoFn#scaleFactor, and I'm assume that I'm not the only
person who has that issue. With that in mind, I think I'd feel more comfortable with a more
explicit statement to specify, either directly or indirectly, that a DoFn should be run on
the map or reduce side.

On the other hand, right now the size of PCollections is only used for deciding on the number
of reducers, although it could probably be used for much more (as illustrated by this issue),
so maybe we should embrace it more and really try to do optimizations based on data size.

> Planner should choose to run DoFns on Map or Reduce side depending on data size
> -------------------------------------------------------------------------------
>                 Key: CRUNCH-222
>                 URL:
>             Project: Crunch
>          Issue Type: New Feature
>    Affects Versions: 0.7.0
>            Reporter: Joseph Adler
> Hi guys,
> I was using Crunch to run a large data pipeline, and came across a problem. In one stage
(between two group functions), I have a DoFn that increases the output data size by a factor
of 5. You can picture the flow like this:
>   GroupByKeyA -> MyDoFn -> GroupByKeyB
> On Hadoop, this translates to something like this:
>   -----------------------------         -----------------------------
>   | map1  -> reduce1 |   ->   | map2  -> reduce2 |
>   -----------------------------         -----------------------------
> Logically, you can either run MyDoFn within reduce1 or map2 and get the same results.
(The same data will be created as an input to reduce2.)
> In the current implementation, MyDoFn always runs in Reduce1. That means that the first
map/reduce job will write out 5x more data than it would if MyDoFn ran in Map2. For my job,
this is a big deal: that's 5x more data written to HDFS and read from HDFS; that's a lot of
extra work on HDFS and a lot of extra network traffic. Clearly, it would be more efficient
to run MyDoFn in map2.
> I'd like to propose that we change the Crunch Planner to take into account the output
size of a DoFn (or set of DoFns) when deciding where it should be run. When the scaleFactor()
is <= 1 (and reduces the data size), it should run on the reduce side. When the scaleFactor
is > 1 (and increases the data size), it should run on the map side. (For a chain of n
DoFns, this implies that Crunch will need to inspect up to n+1 different scales when deciding
where to run each DoFn.)
> -- Joe

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see:

View raw message