crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: thorny Aggregator-on-Spark design problem
Date Thu, 05 Dec 2013 19:34:59 GMT
Spark is not multithreaded, AFAIK.

I like this idea, and I don't see a reason why it wouldn't work. Let me
whip something up and see how it looks-- thanks!


On Thu, Dec 5, 2013 at 11:33 AM, Gabriel Reid <gabriel.reid@gmail.com>wrote:

> Ugh, hit send too soon, sorry about that. Restarting...
>
> As stated before, I'm clueless about Spark, so I'll have to start with
> a basic question: is all of this stuff being run within one thread? Or
> is there thread-safety to be worried about?
>
> If the aggregation is being done in a single thread, then I've got
> another idea (although no promises on how good it is). There could be
> a single Crunch Aggregator (at least a single one per thread), and the
> aggregators used by createCombiner, mergeValue, and mergeCombiners all
> have a reference to the single combiner.
>
> A call to mergeValue would then looking like this (ignore the bad
> syntax and probably wrong API):
>      mergeValue(combiner, value) {
>          combiner.sharedCrunchCombiner.reset();
>          combiner.sharedCrunchCombiner.add(combiner.currentValue)
>          combiner.sharedCrunchCombiner.add(value)
>          combiner.currentValue =
> combiner.sharedCrunchCombiner.getResults()[0]
>      }
>
> That could also be done by building up lists of values in each of the
> combiners, and only using the shared Crunch combiner to merge them
> once they got to a certain size or something.
>
> On the other hand, if this stuff isn't all local to a single thread:
> are there not a lot of other things that might need to be worried
> about, even outside the realm of Aggregators? Currently nothing in
> Crunch DoFns needs to be thread-safe, for obvious reasons. Although
> this probably wouldn't be an issue in 99% of the cases, I would think
> that there will be things that break if DoFns or other Crunch
> primitives are called simultaneously by multiple threads.
>
>
> - Gabriel
>
>
>
> On Thu, Dec 5, 2013 at 8:27 PM, Gabriel Reid <gabriel.reid@gmail.com>
> wrote:
> > As stated before, I'm clueless about Spark, so I'll have to start with
> > a basic question: is all of this stuff being run within one thread? Or
> > is there thread-safety to be worried about?
> >
> > If the aggregation is being done in a single thread, then I've got
> > another idea (although no promises on how good it is). There could be
> > a single Crunch Aggregator (at least a single one per thread), and the
> > aggregators used by createCombiner, mergeValue, and mergeCombiners all
> > have a reference to the single combiner.
> >
> > A call to mergeValue would then looking like this:
> >      mergeValue(combiner, value) {
> >          combiner.sharedCrunchCombiner.reset();
> >
>  combiner.sharedCrunchCombiner.add(combiner.sharedCrunchCombiner.currentValue)
> >
> >      }
> >
> > On Thu, Dec 5, 2013 at 7:31 PM, Josh Wills <jwills@cloudera.com> wrote:
> >> Hey all,
> >>
> >> So I'm working away on CRUNCH-296, the Crunch-on-Spark patch, and I've
> run
> >> into a place where there's a mismatch between the frameworks: combiners,
> >> and how Crunch uses Aggregators to express combine operations.
> >>
> >> The current Crunch Aggregator<T> assumes that it will see all of the
> values
> >> for a given key all at once, because that's how things work in MapReduce
> >> Combiner and Reducer operations. That isn't true in Spark; all of
> Spark's
> >> aggregations are hash based, so when you do a combineByKey() operation
> in
> >> Spark, Spark creates a HashMap and then creates and updates combiner
> >> instance by defining three functions:
> >>
> >> createCombiner: V => C (takes in a value, returns a combiner object)
> >> mergeValue: (V, C) => C (takes in a new value and an existing combiner
> and
> >> updates the combiner)
> >> mergeCombiners: (C, C) => C (take in two combiners and merge them
> together)
> >>
> >> I could do a hack that would make Aggregators usable in the way that
> Spark
> >> expects them to be used-- after all, Aggregator<T> implements
> Serializable,
> >> so there's no issue with serializing Aggregators across the wire in
> either
> >> Spark or MapReduce by using a PTypes.serializables() with the
> PTypeFamily
> >> of the key. The requirement would be that Aggregators would need to be
> >> Cloneable-ish (although not using Cloneable b/c Josh Bloch taught me
> that
> >> was evil), because certain Aggregators have state associated with them
> >> (e.g., string concat) that would need to be passed along (this sort of
> >> vaguely recalls the AggregatorFactory error I made many moons ago.)
> What I
> >> would probably end up with is a default impl that did a
> >> serialize/deserialize to create a new instance of the Aggregator that
> >> subclasses that knew better could override to work optimally.
> >>
> >> That said, that's not the greatest thing ever, and so I'm wondering if
> >> anyone has thought about what a generalization of aggregator would look
> >> like. I am even open to the use of terms like "monoid" if you feel like
> >> there's no other way to express your ideas. ;-)
> >>
> >> J
> >> --
> >> Director of Data Science
> >> Cloudera <http://www.cloudera.com>
> >> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message