crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stan Rosenberg <stan.rosenb...@gmail.com>
Subject Re: PGroupedTable.combineValues should allow composition with PGroupedTable.mapValues
Date Tue, 31 May 2016 20:29:47 GMT
David and Micah, thanks for your replies!  I got sidetracked and didn't get
enough time to test it out.  As you said, parallelDo after combineValues
works as expected.  However, I see a couple of problems with this
approach.  First, the combiner seems to be executed on every reduce call as
witnessed by this stack trace (ReachPerMinute$2 is the closure
corresponding to parallelDo),

at java.lang.Thread.dumpStack(Thread.java:1329)
        at xxx.ReachPerMinute$2.process(ReachPerMinute.java:68)
        at xxx.ReachPerMinute$2.process(ReachPerMinute.java:65)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)

at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)

at org.apache.crunch.fn.Aggregators$AggregatorCombineFn.process(Aggregators.java:525)

at org.apache.crunch.fn.Aggregators$AggregatorCombineFn.process(Aggregators.java:500)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)

at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
        at org.apache.crunch.MapFn.process(MapFn.java:34)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)

at org.apache.crunch.impl.mr.run.RTNode.processIterable(RTNode.java:113)

at org.apache.crunch.impl.mr.run.CrunchReducer.reduce(CrunchReducer.java:57)

In contrast, hadoop may choose to skip the combiner entirely or selectively
(based on configuration).  My test crunch program ran a few minutes slower
with the combiner than the one without it; I wonder if that's due to the
overhead of (combiner) function dispatch.

Second, combineValues returns PTable instead of PGroupedTable.  Thus, the
onus is on the user to track additional state in order to simulate reduce
call in parallelDo. I do understand that it's cleaner to have a uniform
interface for chaining DoFns in both map and reduce.  However, using DoFn
(instead of MapFn) on the output of combineValues does feel like a leaky
abstraction.  Any thoughts?

Best,

stan

On Tue, May 17, 2016 at 9:20 AM, Micah Whitacre <mkwhitacre@gmail.com>
wrote:

> Stan thanks for the question.
>
> If I'm understanding your question correctly, you would like to have code
> like the following:
>
> PTable<String, Set<String>> values = ...;
> PTable<String, Set<String>> values2 = ...;
>
> PTable<String, Integer> countedValues =
> values.union(values2).groupByKey().combineValues(combiner).parallelDo(sizeCalculator,
> ptype);
>
> static class SizeCalculator extends MapFn<Pair<String, Set<String>>,
> Pair<String, Integer>>{
> ...
> }
>
> static class Combiner extends MapFn<Pair<String, Iterable<Set<String>>>,
> Pair<String, Set<String>>>{
> ...
> }
>
> I don't think you want the combiner to actually change the type to integer
> because that would make the operation non-commutative which is a
> requirement of a combiner (both in Crunch and normal MR).  Also should
> clarify that Crunch will still do all of the work defined above inside a
> single MapReduce job.  Crunch will do multiple DoFns inside the map or
> reduce task depending on how it is most efficient to do the work similar to
> Spark.
>
>
> On Tue, May 17, 2016 at 1:01 AM, Stan Rosenberg <stan.rosenberg@gmail.com>
> wrote:
>
>> Hi,
>>
>> I couldn't seem to find sufficient documentation or examples of using
>> combiners in non-trivial ways. Say my map emits values of type Set<String>;
>> after grouping by key I want to emit the _size_ of the union of the sets of
>> strings, i.e., size(union(Iterable<Set<String>>))  Thus, the combiner's
>> type is Iterable<Set<String>> -> Set<String> but the reduce's
type is
>> Iterable<Set<String>> -> Int
>>
>> To my knowledge, both MapReduce and Spark allow a combiner to have a
>> result type different from reducer's.  However, unless I missed something,
>> this is not expressible in Crunch.  Shouldn't PGroupedTable.combineValues
>> return PGroupedTable to allow composition with mapValues?
>>
>> Thanks,
>>
>> stan
>>
>
>

Mime
View raw message