crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: thorny Aggregator-on-Spark design problem
Date Thu, 05 Dec 2013 19:33:11 GMT
Ugh, hit send too soon, sorry about that. Restarting...

As stated before, I'm clueless about Spark, so I'll have to start with
a basic question: is all of this stuff being run within one thread? Or
is there thread-safety to be worried about?

If the aggregation is being done in a single thread, then I've got
another idea (although no promises on how good it is). There could be
a single Crunch Aggregator (at least a single one per thread), and the
aggregators used by createCombiner, mergeValue, and mergeCombiners all
have a reference to the single combiner.

A call to mergeValue would then looking like this (ignore the bad
syntax and probably wrong API):
     mergeValue(combiner, value) {
         combiner.sharedCrunchCombiner.reset();
         combiner.sharedCrunchCombiner.add(combiner.currentValue)
         combiner.sharedCrunchCombiner.add(value)
         combiner.currentValue = combiner.sharedCrunchCombiner.getResults()[0]
     }

That could also be done by building up lists of values in each of the
combiners, and only using the shared Crunch combiner to merge them
once they got to a certain size or something.

On the other hand, if this stuff isn't all local to a single thread:
are there not a lot of other things that might need to be worried
about, even outside the realm of Aggregators? Currently nothing in
Crunch DoFns needs to be thread-safe, for obvious reasons. Although
this probably wouldn't be an issue in 99% of the cases, I would think
that there will be things that break if DoFns or other Crunch
primitives are called simultaneously by multiple threads.


- Gabriel



On Thu, Dec 5, 2013 at 8:27 PM, Gabriel Reid <gabriel.reid@gmail.com> wrote:
> As stated before, I'm clueless about Spark, so I'll have to start with
> a basic question: is all of this stuff being run within one thread? Or
> is there thread-safety to be worried about?
>
> If the aggregation is being done in a single thread, then I've got
> another idea (although no promises on how good it is). There could be
> a single Crunch Aggregator (at least a single one per thread), and the
> aggregators used by createCombiner, mergeValue, and mergeCombiners all
> have a reference to the single combiner.
>
> A call to mergeValue would then looking like this:
>      mergeValue(combiner, value) {
>          combiner.sharedCrunchCombiner.reset();
>          combiner.sharedCrunchCombiner.add(combiner.sharedCrunchCombiner.currentValue)
>
>      }
>
> On Thu, Dec 5, 2013 at 7:31 PM, Josh Wills <jwills@cloudera.com> wrote:
>> Hey all,
>>
>> So I'm working away on CRUNCH-296, the Crunch-on-Spark patch, and I've run
>> into a place where there's a mismatch between the frameworks: combiners,
>> and how Crunch uses Aggregators to express combine operations.
>>
>> The current Crunch Aggregator<T> assumes that it will see all of the values
>> for a given key all at once, because that's how things work in MapReduce
>> Combiner and Reducer operations. That isn't true in Spark; all of Spark's
>> aggregations are hash based, so when you do a combineByKey() operation in
>> Spark, Spark creates a HashMap and then creates and updates combiner
>> instance by defining three functions:
>>
>> createCombiner: V => C (takes in a value, returns a combiner object)
>> mergeValue: (V, C) => C (takes in a new value and an existing combiner and
>> updates the combiner)
>> mergeCombiners: (C, C) => C (take in two combiners and merge them together)
>>
>> I could do a hack that would make Aggregators usable in the way that Spark
>> expects them to be used-- after all, Aggregator<T> implements Serializable,
>> so there's no issue with serializing Aggregators across the wire in either
>> Spark or MapReduce by using a PTypes.serializables() with the PTypeFamily
>> of the key. The requirement would be that Aggregators would need to be
>> Cloneable-ish (although not using Cloneable b/c Josh Bloch taught me that
>> was evil), because certain Aggregators have state associated with them
>> (e.g., string concat) that would need to be passed along (this sort of
>> vaguely recalls the AggregatorFactory error I made many moons ago.) What I
>> would probably end up with is a default impl that did a
>> serialize/deserialize to create a new instance of the Aggregator that
>> subclasses that knew better could override to work optimally.
>>
>> That said, that's not the greatest thing ever, and so I'm wondering if
>> anyone has thought about what a generalization of aggregator would look
>> like. I am even open to the use of terms like "monoid" if you feel like
>> there's no other way to express your ideas. ;-)
>>
>> J
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message