crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Process of CombineFn<S,T> returns <S,U>?
Date Sat, 19 Oct 2013 00:34:11 GMT
I'm certainly not opposed to having something like this. Spark makes this
distinction via Accumulable vs. Accumulator:

http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable
http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator

Maybe we want something like "Aggregatable<R, T>" to go along with our
Aggregator<T> (which could extend Aggregatable<T, T>)?



On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <gabriel.reid@gmail.com>wrote:

> This use case (map/combine <K,V> to <K,U>) seems to come up
> repeatedly. The solution (map <K,V> to <K, Collection<V>> and then
> combine) works but is also pretty unintuitive.
>
> Any thoughts on adding a util in Crunch to do this? It would basically
> just need to be a static util method that takes a MapFn<<K,V><K,U>>
> and a CombineFn<K,U> and would take care of the singleton collection
> mapping stuff internally. On the one hand I'm thinking that this could
> be pretty useful, but I'm not sure if it would make things more
> intuitive or possibly have the reverse effect.
>
> Any opinions? I'm up for putting it together if people think it's worth it.
>
> - Gabriel
>
>
> On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <mkwhit@gmail.com> wrote:
> > Thinking about the technical issues at first glance you could say the
> > restriction is just the way the java generics are written for the
> CombineFn
> > class but if you think about what is actually happening it would be
> awkward
> > to support changing types in the CombineFn especially when it is paired
> > with a GroupByKey.  As I showed in the example the CombineFn essentially
> > bookends the GBK operation by performing processing on the types before
> and
> > after the sorting.  The GBK's types describe the output of the map phase
> > and input to the reduce.  If the CombineFn changed the types then the
> > output wouldn't match the types describe by the GBK.  I'm guessing this
> > could lead to a number of problems trying to compute the types and plan
> for
> > the job.
> >
> >
> > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <mkwhit@gmail.com>
> wrote:
> >
> >> I'm not sure I follow how there is extra effort involved.  Are you
> talking
> >> development effort or processing effort?  From a development effort in
> both
> >> cases you need to write code that translates T to U and combines the
> >> values.  The difference is whether that logic exists inside of a single
> >> DoFn or is split into a MapFn + CombineFn.  So the development effort
> >> should be the same.
> >>
> >>
> >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <cbiswas1983@gmail.com
> >wrote:
> >>
> >>> yeah.. i see what you are talking about. But it will take extra effort
> to
> >>> convert to U type. So, is there any specific reason the way CombineFn
> >>> created initially that CombineFn will not allow other return type. Was
> >>> there any constraints (design / complexity) to restrict to this
> behavior?
> >>> Thanks,
> >>>
> >>>
> >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <mkwhit@gmail.com>
> wrote:
> >>>
> >>> > Chandan,
> >>> >    So let's apply your situation to the types and conversion that is
> >>> > proposed and break it down where logic will be applied.  Say we have
> a
> >>> > PCollection that is like the following:
> >>> >
> >>> > Mapper 1:
> >>> > <id1, "Hello">
> >>> > <id2, "World">
> >>> > <id1, "I like turtles">
> >>> >
> >>> > Mapper 2
> >>> > <id2, "Goodbye">
> >>> >
> >>> > This will be represented by the PTable<String, Comment>.  We
then
> apply
> >>> a
> >>> > MapFn to transform it into PTable<String, Book> and we'd get
the
> >>> following
> >>> > in our PCollection:
> >>> >
> >>> > Mapper 1
> >>> > <id1, <"Hello", 1>>
> >>> > <id2, <"World", 1>>
> >>> > <id1, <"I like turtles", 1>>
> >>> >
> >>> > Mapper 2
> >>> > <id2, <"Goodbye", 1>>
> >>> >
> >>> > Then if we were to use the GBK + CombineFn, the output of the map
> phase
> >>> > would be..
> >>> >
> >>> > Mapper 1
> >>> > <id2, <"World", 1>>
> >>> > <id1, <"I like turtles", 2>>
> >>> >
> >>> > Mapper 2
> >>> > <id2, <"Goodbye", 1>>
> >>> >
> >>> > Notice Mapper 1 would only be emitting 2 items instead of 3 and
> >>> therefore
> >>> > less data is sent over the wire and has to be sorted.  Also in the
> >>> reducer
> >>> > after the GBK is completed the CombineFn would finish its work and
> you'd
> >>> > get the following:
> >>> >
> >>> > Reducer 1
> >>> > <id2, <"Goodbye", 2>>
> >>> > <id1, <"I like turtles", 2>>
> >>> >
> >>> > The only case where this would not improve performance is if you
> never
> >>> emit
> >>> > data for the same key from the same mapper or your mapper doesn't
> reduce
> >>> > the size of the data.
> >>> >
> >>> >
> >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas <
> cbiswas1983@gmail.com
> >>> > >wrote:
> >>> >
> >>> > > I have PTable<String,Comment>. and getting after reduce
> PTable<String,
> >>> > > Book>
> >>> > >
> >>> > > T--> Comment{ String comment, String author}, U--> Book{String
id,
> >>> String
> >>> > > lengthiestComment, int noOfComments}
> >>> > >
> >>> > > But wanted to some aggregations in the map side based on some
logic
> >>> > instead
> >>> > > of all aggregations at reduce side.
> >>> > > Yes in worst case, data flow over the n/w will remain same, but
> >>> sorting
> >>> > > will be improved.
> >>> > >
> >>> > > Thanks,
> >>> > > Chandan
> >>> > >
> >>> > >
> >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <jwills@cloudera.com>
> >>> wrote:
> >>> > >
> >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas <
> >>> cbiswas1983@gmail.com
> >>> > > > >wrote:
> >>> > > >
> >>> > > > > Yeah, I agree with Micah that it will not eliminate
the reduce
> >>> phase
> >>> > > > > entirely. But the dummy object of U suggested by Josh
(or
> >>> converting
> >>> > > to U
> >>> > > > > type in map for every record)  will not improve performance
> >>> because
> >>> > > same
> >>> > > > > amounts of records will be sorted and aggregated in
the reduce
> >>> phase.
> >>> > > >
> >>> > > >
> >>> > > > I don't think that's true-- the records of type U will be
> combined
> >>> on
> >>> > the
> >>> > > > map-side, which would reduce the amount of data that is pushed
> over
> >>> the
> >>> > > > network and improve performance.
> >>> > > >
> >>> > > > Can you give any additional details about what T and U are
in
> this
> >>> > > > scenario? :)
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > > But
> >>> > > > > my point is, can we improve it by applying a combiner
where the
> >>> > > combineFn
> >>> > > > > provides output as different type. If we have same type,
we can
> >>> use
> >>> > the
> >>> > > > > combiner to do some aggregation in map side which improves
> >>> > performance.
> >>> > > > > But, can we have some mechanism by which the same advantage
> can be
> >>> > > > achieved
> >>> > > > > when combineFn emits different type. I think, emitting
same
> type
> >>> by
> >>> > > > > CombineFn has restricted its use. Can we have new CombineFn
> that
> >>> > allows
> >>> > > > us
> >>> > > > > to output different type not only same type as input?
> >>> > > > >
> >>> > > > >
> >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills <
> jwills@cloudera.com>
> >>> > > wrote:
> >>> > > > >
> >>> > > > > > Yeah, my experience in these kinds of situations
is that you
> >>> need
> >>> > to
> >>> > > > come
> >>> > > > > > up with a "dummy" or singleton version of U for
the case
> where
> >>> > there
> >>> > > is
> >>> > > > > > only a single T and do that conversion on the map
side of the
> >>> job,
> >>> > > > before
> >>> > > > > > the combiner runs. I think Chao had an issue like
this awhile
> >>> ago,
> >>> > > > where
> >>> > > > > he
> >>> > > > > > had a PTable<String, Double> and wanted to
write a combiner
> that
> >>> > > would
> >>> > > > > > return a PTable<String, Collection<Double>>.
The solution
> was to
> >>> > > > convert
> >>> > > > > > the map-side object to a PTable<String, Collection<Double>>,
> >>> where
> >>> > > the
> >>> > > > > > value on the map-side was a singleton list containing
just
> that
> >>> > > double
> >>> > > > > > value. Does that sort of trick work here?
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah Whitacre
<
> >>> mkwhit@gmail.com>
> >>> > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Ok so the feature you are trying to achieve
is the
> proactive
> >>> > > > > combination
> >>> > > > > > of
> >>> > > > > > > data before performing the GBK like the javadoc
describes.
> >>> > > >  Essentially
> >>> > > > > > in
> >>> > > > > > > that situation the CombineFn is being used
as a
> Combiner[1] to
> >>> > > > combine
> >>> > > > > > the
> >>> > > > > > > data local to that mapper before doing the
GBK and then
> >>> further
> >>> > > > > combining
> >>> > > > > > > the data in the reduce operation.  It will
not necessarily
> >>> > > eliminate
> >>> > > > > the
> >>> > > > > > > need for all processing in the reduce.
> >>> > > > > > >
> >>> > > > > > > If you want to use this functionality you
will need to do
> the
> >>> > > > > following:
> >>> > > > > > >
> >>> > > > > > > PTable<S, T> map to PTable<S, U>
> >>> > > > > > > PTable<S, U> gbk to PGT<S, U>
> >>> > > > > > > PGT<S, U> combine PTable<S, U>
> >>> > > > > > >
> >>> > > > > > > This will take advantage of any optimization
provided by
> the
> >>> > > > CombineFn.
> >>> > > > > > >
> >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM, Chandan Biswas
<
> >>> > > > cbiswas1983@gmail.com
> >>> > > > > > > >wrote:
> >>> > > > > > >
> >>> > > > > > > > Hello Micah,
> >>> > > > > > > > Yes we are using MapFn now. That aggregation
and
> >>> computation is
> >>> > > > being
> >>> > > > > > > done
> >>> > > > > > > > in reduce phase. As CombineFn after GBK
runs into map
> side,
> >>> > then
> >>> > > > > those
> >>> > > > > > > most
> >>> > > > > > > > computations can be done in map side
which are now
> running
> >>> in
> >>> > > > reduce
> >>> > > > > > > phase.
> >>> > > > > > > > Some smaller aggregations and computations
can be done on
> >>> > reduce
> >>> > > > > phase.
> >>> > > > > > > > My point was to do some aggregation (and
create a new
> >>> object)
> >>> > in
> >>> > > > map
> >>> > > > > > > phase
> >>> > > > > > > > instead of in reduce phase.
> >>> > > > > > > >
> >>> > > > > > > > Thanks,
> >>> > > > > > > > Chandan
> >>> > > > > > > >
> >>> > > > > > > >
> >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48 PM, Micah
Whitacre <
> >>> > > mkwhit@gmail.com>
> >>> > > > > > > wrote:
> >>> > > > > > > >
> >>> > > > > > > > > Chandan,
> >>> > > > > > > > >    I think what you are wanting
will just be a simple
> >>> MapFn
> >>> > > > instead
> >>> > > > > > of
> >>> > > > > > > a
> >>> > > > > > > > > CombineFn.  The doc of the CombineFn[1]
sounds like
> what
> >>> you
> >>> > > want
> >>> > > > > > with
> >>> > > > > > > > the
> >>> > > > > > > > > statement "A special
> >>> > > > > > > > > DoFn<
> >>> > > > > > >
> >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html
> >>> > > >
> >>> > > > > > > > > implementation
> >>> > > > > > > > > that converts an
> >>> > > > > > > > > Iterable<
> >>> > > > > > > > >
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true
> >>> > > > > > > > > >
> >>> > > > > > > > > of
> >>> > > > > > > > > values into a single value" but
it is expecting the
> value
> >>> to
> >>> > be
> >>> > > > of
> >>> > > > > > the
> >>> > > > > > > > same
> >>> > > > > > > > > time.  Since you are wanting to
combine the values
> into a
> >>> > > > different
> >>> > > > > > > form
> >>> > > > > > > > it
> >>> > > > > > > > > should be fairly trivial to write
a MapFn that converts
> >>> the
> >>> > > > > > Iterable<T>
> >>> > > > > > > > ->
> >>> > > > > > > > > U.
> >>> > > > > > > > >
> >>> > > > > > > > > [1] -
> >>> > > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>>
> http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html
> >>> > > > > > > > >
> >>> > > > > > > > >
> >>> > > > > > > > > On Thu, Oct 17, 2013 at 3:30 PM,
Chandan Biswas <
> >>> > > > > > cbiswas1983@gmail.com
> >>> > > > > > > > > >wrote:
> >>> > > > > > > > >
> >>> > > > > > > > > > I was trying to refactoring
some stuffs and trying to
> >>> use
> >>> > > > > > combineFn.
> >>> > > > > > > > > > But when I went into deeper,
found that I can't do
> it as
> >>> > > Crunch
> >>> > > > > > > doesn't
> >>> > > > > > > > > > allow it the functionality
I needed. For example, I
> >>> have a
> >>> > > > > > > > > > PGroupedTable<S,T>. I
wanted to apply CombineFn<S,T>
> on
> >>> it
> >>> > > and
> >>> > > > > > wanted
> >>> > > > > > > > to
> >>> > > > > > > > > > get PCollection<S,U>
instead of T. Right now,
> CombineFn
> >>> > > allows
> >>> > > > > only
> >>> > > > > > > > same
> >>> > > > > > > > > > type as return value. The use
case of this need is
> that
> >>> > there
> >>> > > > > will
> >>> > > > > > be
> >>> > > > > > > > > some
> >>> > > > > > > > > > time saving in sorting. It's
natural that when
> >>> aggregating
> >>> > > some
> >>> > > > > > > objects
> >>> > > > > > > > > at
> >>> > > > > > > > > > map side can create a new different
type object.
> >>> > > > > > > > > >
> >>> > > > > > > > > > Any thought on it? Am I missing
any thing? If this
> can
> >>> be
> >>> > > > written
> >>> > > > > > in
> >>> > > > > > > > > > different way using existing
way please let me know.
> >>> > > > > > > > > >
> >>> > > > > > > > > > Thanks
> >>> > > > > > > > > > Chandan
> >>> > > > > > > > > >
> >>> > > > > > > > >
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > --
> >>> > > > > > Director of Data Science
> >>> > > > > > Cloudera <http://www.cloudera.com>
> >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > --
> >>> > > > Director of Data Science
> >>> > > > Cloudera <http://www.cloudera.com>
> >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message