crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: thorny Aggregator-on-Spark design problem
Date Thu, 05 Dec 2013 19:27:57 GMT
As stated before, I'm clueless about Spark, so I'll have to start with
a basic question: is all of this stuff being run within one thread? Or
is there thread-safety to be worried about?

If the aggregation is being done in a single thread, then I've got
another idea (although no promises on how good it is). There could be
a single Crunch Aggregator (at least a single one per thread), and the
aggregators used by createCombiner, mergeValue, and mergeCombiners all
have a reference to the single combiner.

A call to mergeValue would then looking like this:
     mergeValue(combiner, value) {
         combiner.sharedCrunchCombiner.reset();
         combiner.sharedCrunchCombiner.add(combiner.sharedCrunchCombiner.currentValue)

     }

On Thu, Dec 5, 2013 at 7:31 PM, Josh Wills <jwills@cloudera.com> wrote:
> Hey all,
>
> So I'm working away on CRUNCH-296, the Crunch-on-Spark patch, and I've run
> into a place where there's a mismatch between the frameworks: combiners,
> and how Crunch uses Aggregators to express combine operations.
>
> The current Crunch Aggregator<T> assumes that it will see all of the values
> for a given key all at once, because that's how things work in MapReduce
> Combiner and Reducer operations. That isn't true in Spark; all of Spark's
> aggregations are hash based, so when you do a combineByKey() operation in
> Spark, Spark creates a HashMap and then creates and updates combiner
> instance by defining three functions:
>
> createCombiner: V => C (takes in a value, returns a combiner object)
> mergeValue: (V, C) => C (takes in a new value and an existing combiner and
> updates the combiner)
> mergeCombiners: (C, C) => C (take in two combiners and merge them together)
>
> I could do a hack that would make Aggregators usable in the way that Spark
> expects them to be used-- after all, Aggregator<T> implements Serializable,
> so there's no issue with serializing Aggregators across the wire in either
> Spark or MapReduce by using a PTypes.serializables() with the PTypeFamily
> of the key. The requirement would be that Aggregators would need to be
> Cloneable-ish (although not using Cloneable b/c Josh Bloch taught me that
> was evil), because certain Aggregators have state associated with them
> (e.g., string concat) that would need to be passed along (this sort of
> vaguely recalls the AggregatorFactory error I made many moons ago.) What I
> would probably end up with is a default impl that did a
> serialize/deserialize to create a new instance of the Aggregator that
> subclasses that knew better could override to work optimally.
>
> That said, that's not the greatest thing ever, and so I'm wondering if
> anyone has thought about what a generalization of aggregator would look
> like. I am even open to the use of terms like "monoid" if you feel like
> there's no other way to express your ideas. ;-)
>
> J
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message