crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chandan Biswas <cbiswas1...@gmail.com>
Subject Re: Process of CombineFn<S,T> returns <S,U>?
Date Sat, 19 Oct 2013 05:00:30 GMT
Thanks Gabriel for clarifying it :)


On Fri, Oct 18, 2013 at 11:28 PM, Gabriel Reid <gabriel.reid@gmail.com>wrote:

> Hi Chandan,
>
> Inlined below.
>
> On Sat, Oct 19, 2013 at 3:31 AM, Chandan Biswas <cbiswas1983@gmail.com>
> wrote:
> > Please correct me if I am wrong. I want to understand more how crunch
> > create map reduce jobs as pointed out by Micah in earlier mail.
> > Suppose I am doing some steps of operation as follows:
> > I have a PTable<K,T> table.
> > PGroupedTable<K,T> grpedTable1=table.groupByKey();
> > Now I am applying CombineFn on grpedTable1 and getting table2
> > PTable<K,T> table2=grpedTable1.parallelDo(..,CombineFn<K,T>,..);
> > PGrpoupedTable<K,T> grpedTable2=table2.groupByKey();
> > PTable<K,U> table3=grpedTable2.parallelDo(..,DoFn,...);
> >
> > So, which type of grpedTable2 or grpdTable1 will be used for reducers? My
> > understanding is type of grpedTable2 will be used for reducers and type
> of
> > grpedTable1 will be used for shuffle/sorting at map side. Otherwise,
> there
> > will be no way send the Iterable data to reducers.
> > If that is the case, then the point of not changing the type by CombineFn
> > doesn't hold. Otherwise, not changing the type by CombineFn makes
> complete
> > sense.
> >
>
> In this example, there would be two MapReduce jobs kicked off. The
> first one would read in table, and then use a Combiner (based on the
> CombineFn) before the reducer (i.e. before the groupByKey), and then
> the same CombineFn within the reducer, to create table2.
>
> Going from table2 would be another MapReduce job that would do nothing
> in the mapper, and execute the supplied DoFn in the reducer.
>
> > It will be awesome to have such functionality like Spark as Josh pointed
> > out to overcome it in Crunch.
>
> Just to be clear, adding the "Aggregatable" functionality in Crunch
> won't actually add anything that isn't possible right now -- instead,
> it will just wrap current functionality into a more readable unit (at
> least that's how I see it).
>
> - Gabriel
>
>
> > Thanks,
> > Chandan
> >
> >
> >
> > On Fri, Oct 18, 2013 at 7:34 PM, Josh Wills <jwills@cloudera.com> wrote:
> >
> >> I'm certainly not opposed to having something like this. Spark makes
> this
> >> distinction via Accumulable vs. Accumulator:
> >>
> >>
> >>
> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable
> >>
> >>
> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator
> >>
> >> Maybe we want something like "Aggregatable<R, T>" to go along with our
> >> Aggregator<T> (which could extend Aggregatable<T, T>)?
> >>
> >>
> >>
> >> On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <gabriel.reid@gmail.com
> >> >wrote:
> >>
> >> > This use case (map/combine <K,V> to <K,U>) seems to come up
> >> > repeatedly. The solution (map <K,V> to <K, Collection<V>>
and then
> >> > combine) works but is also pretty unintuitive.
> >> >
> >> > Any thoughts on adding a util in Crunch to do this? It would basically
> >> > just need to be a static util method that takes a MapFn<<K,V><K,U>>
> >> > and a CombineFn<K,U> and would take care of the singleton collection
> >> > mapping stuff internally. On the one hand I'm thinking that this could
> >> > be pretty useful, but I'm not sure if it would make things more
> >> > intuitive or possibly have the reverse effect.
> >> >
> >> > Any opinions? I'm up for putting it together if people think it's
> worth
> >> it.
> >> >
> >> > - Gabriel
> >> >
> >> >
> >> > On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <mkwhit@gmail.com>
> >> wrote:
> >> > > Thinking about the technical issues at first glance you could say
> the
> >> > > restriction is just the way the java generics are written for the
> >> > CombineFn
> >> > > class but if you think about what is actually happening it would be
> >> > awkward
> >> > > to support changing types in the CombineFn especially when it is
> paired
> >> > > with a GroupByKey.  As I showed in the example the CombineFn
> >> essentially
> >> > > bookends the GBK operation by performing processing on the types
> before
> >> > and
> >> > > after the sorting.  The GBK's types describe the output of the map
> >> phase
> >> > > and input to the reduce.  If the CombineFn changed the types then
> the
> >> > > output wouldn't match the types describe by the GBK.  I'm guessing
> this
> >> > > could lead to a number of problems trying to compute the types and
> plan
> >> > for
> >> > > the job.
> >> > >
> >> > >
> >> > > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <mkwhit@gmail.com>
> >> > wrote:
> >> > >
> >> > >> I'm not sure I follow how there is extra effort involved.  Are
you
> >> > talking
> >> > >> development effort or processing effort?  From a development
> effort in
> >> > both
> >> > >> cases you need to write code that translates T to U and combines
> the
> >> > >> values.  The difference is whether that logic exists inside of
a
> >> single
> >> > >> DoFn or is split into a MapFn + CombineFn.  So the development
> effort
> >> > >> should be the same.
> >> > >>
> >> > >>
> >> > >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <
> >> cbiswas1983@gmail.com
> >> > >wrote:
> >> > >>
> >> > >>> yeah.. i see what you are talking about. But it will take
extra
> >> effort
> >> > to
> >> > >>> convert to U type. So, is there any specific reason the way
> CombineFn
> >> > >>> created initially that CombineFn will not allow other return
type.
> >> Was
> >> > >>> there any constraints (design / complexity) to restrict to
this
> >> > behavior?
> >> > >>> Thanks,
> >> > >>>
> >> > >>>
> >> > >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <mkwhit@gmail.com
> >
> >> > wrote:
> >> > >>>
> >> > >>> > Chandan,
> >> > >>> >    So let's apply your situation to the types and conversion
> that
> >> is
> >> > >>> > proposed and break it down where logic will be applied.
 Say we
> >> have
> >> > a
> >> > >>> > PCollection that is like the following:
> >> > >>> >
> >> > >>> > Mapper 1:
> >> > >>> > <id1, "Hello">
> >> > >>> > <id2, "World">
> >> > >>> > <id1, "I like turtles">
> >> > >>> >
> >> > >>> > Mapper 2
> >> > >>> > <id2, "Goodbye">
> >> > >>> >
> >> > >>> > This will be represented by the PTable<String, Comment>.
 We
> then
> >> > apply
> >> > >>> a
> >> > >>> > MapFn to transform it into PTable<String, Book>
and we'd get the
> >> > >>> following
> >> > >>> > in our PCollection:
> >> > >>> >
> >> > >>> > Mapper 1
> >> > >>> > <id1, <"Hello", 1>>
> >> > >>> > <id2, <"World", 1>>
> >> > >>> > <id1, <"I like turtles", 1>>
> >> > >>> >
> >> > >>> > Mapper 2
> >> > >>> > <id2, <"Goodbye", 1>>
> >> > >>> >
> >> > >>> > Then if we were to use the GBK + CombineFn, the output
of the
> map
> >> > phase
> >> > >>> > would be..
> >> > >>> >
> >> > >>> > Mapper 1
> >> > >>> > <id2, <"World", 1>>
> >> > >>> > <id1, <"I like turtles", 2>>
> >> > >>> >
> >> > >>> > Mapper 2
> >> > >>> > <id2, <"Goodbye", 1>>
> >> > >>> >
> >> > >>> > Notice Mapper 1 would only be emitting 2 items instead
of 3 and
> >> > >>> therefore
> >> > >>> > less data is sent over the wire and has to be sorted.
 Also in
> the
> >> > >>> reducer
> >> > >>> > after the GBK is completed the CombineFn would finish
its work
> and
> >> > you'd
> >> > >>> > get the following:
> >> > >>> >
> >> > >>> > Reducer 1
> >> > >>> > <id2, <"Goodbye", 2>>
> >> > >>> > <id1, <"I like turtles", 2>>
> >> > >>> >
> >> > >>> > The only case where this would not improve performance
is if you
> >> > never
> >> > >>> emit
> >> > >>> > data for the same key from the same mapper or your mapper
> doesn't
> >> > reduce
> >> > >>> > the size of the data.
> >> > >>> >
> >> > >>> >
> >> > >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas <
> >> > cbiswas1983@gmail.com
> >> > >>> > >wrote:
> >> > >>> >
> >> > >>> > > I have PTable<String,Comment>. and getting
after reduce
> >> > PTable<String,
> >> > >>> > > Book>
> >> > >>> > >
> >> > >>> > > T--> Comment{ String comment, String author},
U--> Book{String
> >> id,
> >> > >>> String
> >> > >>> > > lengthiestComment, int noOfComments}
> >> > >>> > >
> >> > >>> > > But wanted to some aggregations in the map side
based on some
> >> logic
> >> > >>> > instead
> >> > >>> > > of all aggregations at reduce side.
> >> > >>> > > Yes in worst case, data flow over the n/w will remain
same,
> but
> >> > >>> sorting
> >> > >>> > > will be improved.
> >> > >>> > >
> >> > >>> > > Thanks,
> >> > >>> > > Chandan
> >> > >>> > >
> >> > >>> > >
> >> > >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <
> jwills@cloudera.com
> >> >
> >> > >>> wrote:
> >> > >>> > >
> >> > >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas
<
> >> > >>> cbiswas1983@gmail.com
> >> > >>> > > > >wrote:
> >> > >>> > > >
> >> > >>> > > > > Yeah, I agree with Micah that it will
not eliminate the
> >> reduce
> >> > >>> phase
> >> > >>> > > > > entirely. But the dummy object of U suggested
by Josh (or
> >> > >>> converting
> >> > >>> > > to U
> >> > >>> > > > > type in map for every record)  will not
improve
> performance
> >> > >>> because
> >> > >>> > > same
> >> > >>> > > > > amounts of records will be sorted and
aggregated in the
> >> reduce
> >> > >>> phase.
> >> > >>> > > >
> >> > >>> > > >
> >> > >>> > > > I don't think that's true-- the records of
type U will be
> >> > combined
> >> > >>> on
> >> > >>> > the
> >> > >>> > > > map-side, which would reduce the amount of
data that is
> pushed
> >> > over
> >> > >>> the
> >> > >>> > > > network and improve performance.
> >> > >>> > > >
> >> > >>> > > > Can you give any additional details about what
T and U are
> in
> >> > this
> >> > >>> > > > scenario? :)
> >> > >>> > > >
> >> > >>> > > >
> >> > >>> > > >
> >> > >>> > > > > But
> >> > >>> > > > > my point is, can we improve it by applying
a combiner
> where
> >> the
> >> > >>> > > combineFn
> >> > >>> > > > > provides output as different type. If
we have same type,
> we
> >> can
> >> > >>> use
> >> > >>> > the
> >> > >>> > > > > combiner to do some aggregation in map
side which improves
> >> > >>> > performance.
> >> > >>> > > > > But, can we have some mechanism by which
the same
> advantage
> >> > can be
> >> > >>> > > > achieved
> >> > >>> > > > > when combineFn emits different type. I
think, emitting
> same
> >> > type
> >> > >>> by
> >> > >>> > > > > CombineFn has restricted its use. Can
we have new
> CombineFn
> >> > that
> >> > >>> > allows
> >> > >>> > > > us
> >> > >>> > > > > to output different type not only same
type as input?
> >> > >>> > > > >
> >> > >>> > > > >
> >> > >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh
Wills <
> >> > jwills@cloudera.com>
> >> > >>> > > wrote:
> >> > >>> > > > >
> >> > >>> > > > > > Yeah, my experience in these kinds
of situations is that
> >> you
> >> > >>> need
> >> > >>> > to
> >> > >>> > > > come
> >> > >>> > > > > > up with a "dummy" or singleton version
of U for the case
> >> > where
> >> > >>> > there
> >> > >>> > > is
> >> > >>> > > > > > only a single T and do that conversion
on the map side
> of
> >> the
> >> > >>> job,
> >> > >>> > > > before
> >> > >>> > > > > > the combiner runs. I think Chao had
an issue like this
> >> awhile
> >> > >>> ago,
> >> > >>> > > > where
> >> > >>> > > > > he
> >> > >>> > > > > > had a PTable<String, Double>
and wanted to write a
> combiner
> >> > that
> >> > >>> > > would
> >> > >>> > > > > > return a PTable<String, Collection<Double>>.
The
> solution
> >> > was to
> >> > >>> > > > convert
> >> > >>> > > > > > the map-side object to a PTable<String,
> >> Collection<Double>>,
> >> > >>> where
> >> > >>> > > the
> >> > >>> > > > > > value on the map-side was a singleton
list containing
> just
> >> > that
> >> > >>> > > double
> >> > >>> > > > > > value. Does that sort of trick work
here?
> >> > >>> > > > > >
> >> > >>> > > > > >
> >> > >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM,
Micah Whitacre <
> >> > >>> mkwhit@gmail.com>
> >> > >>> > > > > wrote:
> >> > >>> > > > > >
> >> > >>> > > > > > > Ok so the feature you are trying
to achieve is the
> >> > proactive
> >> > >>> > > > > combination
> >> > >>> > > > > > of
> >> > >>> > > > > > > data before performing the GBK
like the javadoc
> >> describes.
> >> > >>> > > >  Essentially
> >> > >>> > > > > > in
> >> > >>> > > > > > > that situation the CombineFn
is being used as a
> >> > Combiner[1] to
> >> > >>> > > > combine
> >> > >>> > > > > > the
> >> > >>> > > > > > > data local to that mapper before
doing the GBK and
> then
> >> > >>> further
> >> > >>> > > > > combining
> >> > >>> > > > > > > the data in the reduce operation.
 It will not
> >> necessarily
> >> > >>> > > eliminate
> >> > >>> > > > > the
> >> > >>> > > > > > > need for all processing in the
reduce.
> >> > >>> > > > > > >
> >> > >>> > > > > > > If you want to use this functionality
you will need
> to do
> >> > the
> >> > >>> > > > > following:
> >> > >>> > > > > > >
> >> > >>> > > > > > > PTable<S, T> map to PTable<S,
U>
> >> > >>> > > > > > > PTable<S, U> gbk to PGT<S,
U>
> >> > >>> > > > > > > PGT<S, U> combine PTable<S,
U>
> >> > >>> > > > > > >
> >> > >>> > > > > > > This will take advantage of
any optimization provided
> by
> >> > the
> >> > >>> > > > CombineFn.
> >> > >>> > > > > > >
> >> > >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce
> >> > >>> > > > > > >
> >> > >>> > > > > > >
> >> > >>> > > > > > >
> >> > >>> > > > > > > On Thu, Oct 17, 2013 at 4:30
PM, Chandan Biswas <
> >> > >>> > > > cbiswas1983@gmail.com
> >> > >>> > > > > > > >wrote:
> >> > >>> > > > > > >
> >> > >>> > > > > > > > Hello Micah,
> >> > >>> > > > > > > > Yes we are using MapFn
now. That aggregation and
> >> > >>> computation is
> >> > >>> > > > being
> >> > >>> > > > > > > done
> >> > >>> > > > > > > > in reduce phase. As CombineFn
after GBK runs into
> map
> >> > side,
> >> > >>> > then
> >> > >>> > > > > those
> >> > >>> > > > > > > most
> >> > >>> > > > > > > > computations can be done
in map side which are now
> >> > running
> >> > >>> in
> >> > >>> > > > reduce
> >> > >>> > > > > > > phase.
> >> > >>> > > > > > > > Some smaller aggregations
and computations can be
> done
> >> on
> >> > >>> > reduce
> >> > >>> > > > > phase.
> >> > >>> > > > > > > > My point was to do some
aggregation (and create a
> new
> >> > >>> object)
> >> > >>> > in
> >> > >>> > > > map
> >> > >>> > > > > > > phase
> >> > >>> > > > > > > > instead of in reduce phase.
> >> > >>> > > > > > > >
> >> > >>> > > > > > > > Thanks,
> >> > >>> > > > > > > > Chandan
> >> > >>> > > > > > > >
> >> > >>> > > > > > > >
> >> > >>> > > > > > > > On Thu, Oct 17, 2013 at
3:48 PM, Micah Whitacre <
> >> > >>> > > mkwhit@gmail.com>
> >> > >>> > > > > > > wrote:
> >> > >>> > > > > > > >
> >> > >>> > > > > > > > > Chandan,
> >> > >>> > > > > > > > >    I think what you
are wanting will just be a
> simple
> >> > >>> MapFn
> >> > >>> > > > instead
> >> > >>> > > > > > of
> >> > >>> > > > > > > a
> >> > >>> > > > > > > > > CombineFn.  The doc
of the CombineFn[1] sounds
> like
> >> > what
> >> > >>> you
> >> > >>> > > want
> >> > >>> > > > > > with
> >> > >>> > > > > > > > the
> >> > >>> > > > > > > > > statement "A special
> >> > >>> > > > > > > > > DoFn<
> >> > >>> > > > > > >
> >> > >>> >
> http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html
> >> > >>> > > >
> >> > >>> > > > > > > > > implementation
> >> > >>> > > > > > > > > that converts an
> >> > >>> > > > > > > > > Iterable<
> >> > >>> > > > > > > > >
> >> > >>> > > > > > > >
> >> > >>> > > > > > >
> >> > >>> > > > > >
> >> > >>> > > > >
> >> > >>> > > >
> >> > >>> > >
> >> > >>> >
> >> > >>>
> >> >
> >>
> http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true
> >> > >>> > > > > > > > > >
> >> > >>> > > > > > > > > of
> >> > >>> > > > > > > > > values into a single
value" but it is expecting
> the
> >> > value
> >> > >>> to
> >> > >>> > be
> >> > >>> > > > of
> >> > >>> > > > > > the
> >> > >>> > > > > > > > same
> >> > >>> > > > > > > > > time.  Since you are
wanting to combine the values
> >> > into a
> >> > >>> > > > different
> >> > >>> > > > > > > form
> >> > >>> > > > > > > > it
> >> > >>> > > > > > > > > should be fairly trivial
to write a MapFn that
> >> converts
> >> > >>> the
> >> > >>> > > > > > Iterable<T>
> >> > >>> > > > > > > > ->
> >> > >>> > > > > > > > > U.
> >> > >>> > > > > > > > >
> >> > >>> > > > > > > > > [1] -
> >> > >>> > > > > > > > >
> >> > >>> > > > > > >
> >> > >>> > > > >
> >> > >>> > >
> >> > >>>
> >> >
> http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html
> >> > >>> > > > > > > > >
> >> > >>> > > > > > > > >
> >> > >>> > > > > > > > > On Thu, Oct 17, 2013
at 3:30 PM, Chandan Biswas <
> >> > >>> > > > > > cbiswas1983@gmail.com
> >> > >>> > > > > > > > > >wrote:
> >> > >>> > > > > > > > >
> >> > >>> > > > > > > > > > I was trying
to refactoring some stuffs and
> trying
> >> to
> >> > >>> use
> >> > >>> > > > > > combineFn.
> >> > >>> > > > > > > > > > But when I went
into deeper, found that I can't
> do
> >> > it as
> >> > >>> > > Crunch
> >> > >>> > > > > > > doesn't
> >> > >>> > > > > > > > > > allow it the
functionality I needed. For
> example, I
> >> > >>> have a
> >> > >>> > > > > > > > > > PGroupedTable<S,T>.
I wanted to apply
> >> CombineFn<S,T>
> >> > on
> >> > >>> it
> >> > >>> > > and
> >> > >>> > > > > > wanted
> >> > >>> > > > > > > > to
> >> > >>> > > > > > > > > > get PCollection<S,U>
instead of T. Right now,
> >> > CombineFn
> >> > >>> > > allows
> >> > >>> > > > > only
> >> > >>> > > > > > > > same
> >> > >>> > > > > > > > > > type as return
value. The use case of this need
> is
> >> > that
> >> > >>> > there
> >> > >>> > > > > will
> >> > >>> > > > > > be
> >> > >>> > > > > > > > > some
> >> > >>> > > > > > > > > > time saving in
sorting. It's natural that when
> >> > >>> aggregating
> >> > >>> > > some
> >> > >>> > > > > > > objects
> >> > >>> > > > > > > > > at
> >> > >>> > > > > > > > > > map side can
create a new different type object.
> >> > >>> > > > > > > > > >
> >> > >>> > > > > > > > > > Any thought on
it? Am I missing any thing? If
> this
> >> > can
> >> > >>> be
> >> > >>> > > > written
> >> > >>> > > > > > in
> >> > >>> > > > > > > > > > different way
using existing way please let me
> >> know.
> >> > >>> > > > > > > > > >
> >> > >>> > > > > > > > > > Thanks
> >> > >>> > > > > > > > > > Chandan
> >> > >>> > > > > > > > > >
> >> > >>> > > > > > > > >
> >> > >>> > > > > > > >
> >> > >>> > > > > > >
> >> > >>> > > > > >
> >> > >>> > > > > >
> >> > >>> > > > > >
> >> > >>> > > > > > --
> >> > >>> > > > > > Director of Data Science
> >> > >>> > > > > > Cloudera <http://www.cloudera.com>
> >> > >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >> > >>> > > > > >
> >> > >>> > > > >
> >> > >>> > > >
> >> > >>> > > >
> >> > >>> > > >
> >> > >>> > > > --
> >> > >>> > > > Director of Data Science
> >> > >>> > > > Cloudera <http://www.cloudera.com>
> >> > >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >> > >>> > > >
> >> > >>> > >
> >> > >>> >
> >> > >>>
> >> > >>
> >> > >>
> >> >
> >>
> >>
> >>
> >> --
> >> Director of Data Science
> >> Cloudera <http://www.cloudera.com>
> >> Twitter: @josh_wills <http://twitter.com/josh_wills>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message