crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gabriel Reid (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-278) Improvements to MapsideJoin code
Date Mon, 14 Oct 2013 07:23:42 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13793967#comment-13793967
] 

Gabriel Reid commented on CRUNCH-278:
-------------------------------------

I think that there's still something that I think I'm not totally getting, or I'm looking
at this from the wrong angle.

Taking two different cases of the same kind of thing, I think we need to be able to distinguish
how we want them to be dealt with, as follows.

In this example, we want the DoFns leading up to the toBundle() call to be run in a separate
job:
{code:java}
PTable<K,V> hugeTable = pipeline.read(...);
PTable<K,V> muchSmallerTable = hugeTable.parallelDo(myFilterFn);
// Now MapsideJoinStrategy will call muchSmallerTable.toBundle(), but we want
// myFilterFn to have run in a separate MR job leading up to this MapsideJoin because
// it's reducing a huge amount of data into something that fits in memory
PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, muchSmallerTable);
{code}

and in this example we want the DoFns to be run in memory while directly reading in smallTable
from the Source
{code:java}
PTable<K,V> smallTable = pipeline.read(...);
PTable<K,V> filteredSmallTable = smallTable.parallelDo(myFilterFn);
// Now MapsideJoinStrategy will call filteredSmallTable.toBundle(), and we want
// myFilterFn to be run in each of the mappers while materializing the contents 
// of smallTable, because there's no need to kick of a separate MR job for that
// due to the small size of the data
PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, filteredSmallTable);
{code}

I think that the point is that there needs to be the ability in the API to set a spot where
we can say "everything from here on in will be run in memory". We can do that with something
on ParallelDoOptions, but I think we can run into the same problem again where it's hard to
define what will (or even what should) happen if you want to write a PCollection to storage
if it's got some in-memory operations defined somewhere further upstream in the Pipeline.

FWIW, I'm pretty much convinced that the MaterializedPCollection isn't the way to go for this.

> Improvements to MapsideJoin code
> --------------------------------
>
>                 Key: CRUNCH-278
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-278
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core, MapReduce Patterns
>            Reporter: Josh Wills
>            Assignee: Josh Wills
>         Attachments: CRUNCH-278.patch
>
>
> The fact that we have special-case code in the MapsideJoinStrategy for the in-memory
and MR-based Pipeline instances has always bugged me, so I set out to eliminate the distinction
between the two impls by creating a new interface, ReadableSourceBundle<T>, that encapsulates
the MR and in-memory specific logic for doing mapside joins in order to remove the special-case
code in MapsideJoinStrategy and hopefully make other implementations that use our mapside-join
patterns much easier to test.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Mime
View raw message