crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <mkwhita...@gmail.com>
Subject Re: PGroupedTable.combineValues should allow composition with PGroupedTable.mapValues
Date Wed, 08 Jun 2016 16:50:45 GMT
Stan,
 >> hadoop may choose to skip the combiner entirely or selectively (based
on configuration)

Sorry not familiar with this config can you provide a link to that
documentation?  Also regarding performance, how much spilling occurred?

>> combineValues returns PTable instead of PGroupedTable.  Thus, the onus
is on the user to track additional state in order to simulate reduce call
in parallelDo. I do understand that it's cleaner to have a uniform
interface for chaining DoFns in both map and reduce.  However, using DoFn
(instead of MapFn) on the output of combineValues does feel like a leaky
abstraction.  Any thoughts?

Can you describe the use case you are after a bit more?  Why do you
specifically need a PGroupedTable as the output of the combine function?
Would your combine function emit the keys that then need to be grouped
again?  Or is it emitting multiple values?  It sounds like you'd only be
emitting one value since you want to use a MapFn.

I agree that it is a dissatisfier that the developer would need to simulate
reduce calls in a their Crunch code as in general the goal would be to not
think about the distinct map/reduce phases of a MR job but instead the
functional transformations.  I'm just not sure I'm understanding your use
case where you'd have to simulate the reduce unless you were grouping by a
different key.


On Tue, May 31, 2016 at 3:29 PM, Stan Rosenberg <stan.rosenberg@gmail.com>
wrote:

> David and Micah, thanks for your replies!  I got sidetracked and didn't
> get enough time to test it out.  As you said, parallelDo after
> combineValues works as expected.  However, I see a couple of problems with
> this approach.  First, the combiner seems to be executed on every reduce
> call as witnessed by this stack trace (ReachPerMinute$2 is the closure
> corresponding to parallelDo),
>
> at java.lang.Thread.dumpStack(Thread.java:1329)
>         at xxx.ReachPerMinute$2.process(ReachPerMinute.java:68)
>         at xxx.ReachPerMinute$2.process(ReachPerMinute.java:65)
>         at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>
> at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>
> at org.apache.crunch.fn.Aggregators$AggregatorCombineFn.process(Aggregators.java:525)
>
> at org.apache.crunch.fn.Aggregators$AggregatorCombineFn.process(Aggregators.java:500)
>         at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>
> at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>         at org.apache.crunch.MapFn.process(MapFn.java:34)
>         at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
>
> at org.apache.crunch.impl.mr.run.RTNode.processIterable(RTNode.java:113)
>
> at org.apache.crunch.impl.mr.run.CrunchReducer.reduce(CrunchReducer.java:57)
>
> In contrast, hadoop may choose to skip the combiner entirely or
> selectively (based on configuration).  My test crunch program ran a few
> minutes slower with the combiner than the one without it; I wonder if
> that's due to the overhead of (combiner) function dispatch.
>
> Second, combineValues returns PTable instead of PGroupedTable.  Thus, the
> onus is on the user to track additional state in order to simulate reduce
> call in parallelDo. I do understand that it's cleaner to have a uniform
> interface for chaining DoFns in both map and reduce.  However, using DoFn
> (instead of MapFn) on the output of combineValues does feel like a leaky
> abstraction.  Any thoughts?
>
> Best,
>
> stan
>
> On Tue, May 17, 2016 at 9:20 AM, Micah Whitacre <mkwhitacre@gmail.com>
> wrote:
>
>> Stan thanks for the question.
>>
>> If I'm understanding your question correctly, you would like to have code
>> like the following:
>>
>> PTable<String, Set<String>> values = ...;
>> PTable<String, Set<String>> values2 = ...;
>>
>> PTable<String, Integer> countedValues =
>> values.union(values2).groupByKey().combineValues(combiner).parallelDo(sizeCalculator,
>> ptype);
>>
>> static class SizeCalculator extends MapFn<Pair<String, Set<String>>,
>> Pair<String, Integer>>{
>> ...
>> }
>>
>> static class Combiner extends MapFn<Pair<String, Iterable<Set<String>>>,
>> Pair<String, Set<String>>>{
>> ...
>> }
>>
>> I don't think you want the combiner to actually change the type to
>> integer because that would make the operation non-commutative which is a
>> requirement of a combiner (both in Crunch and normal MR).  Also should
>> clarify that Crunch will still do all of the work defined above inside a
>> single MapReduce job.  Crunch will do multiple DoFns inside the map or
>> reduce task depending on how it is most efficient to do the work similar to
>> Spark.
>>
>>
>> On Tue, May 17, 2016 at 1:01 AM, Stan Rosenberg <stan.rosenberg@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> I couldn't seem to find sufficient documentation or examples of using
>>> combiners in non-trivial ways. Say my map emits values of type Set<String>;
>>> after grouping by key I want to emit the _size_ of the union of the sets of
>>> strings, i.e., size(union(Iterable<Set<String>>))  Thus, the combiner's
>>> type is Iterable<Set<String>> -> Set<String> but the reduce's
type is
>>> Iterable<Set<String>> -> Int
>>>
>>> To my knowledge, both MapReduce and Spark allow a combiner to have a
>>> result type different from reducer's.  However, unless I missed something,
>>> this is not expressible in Crunch.  Shouldn't PGroupedTable.combineValues
>>> return PGroupedTable to allow composition with mapValues?
>>>
>>> Thanks,
>>>
>>> stan
>>>
>>
>>
>

Mime
View raw message