crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: Process of CombineFn<S,T> returns <S,U>?
Date Sat, 19 Oct 2013 04:28:34 GMT
Hi Chandan,

Inlined below.

On Sat, Oct 19, 2013 at 3:31 AM, Chandan Biswas <cbiswas1983@gmail.com> wrote:
> Please correct me if I am wrong. I want to understand more how crunch
> create map reduce jobs as pointed out by Micah in earlier mail.
> Suppose I am doing some steps of operation as follows:
> I have a PTable<K,T> table.
> PGroupedTable<K,T> grpedTable1=table.groupByKey();
> Now I am applying CombineFn on grpedTable1 and getting table2
> PTable<K,T> table2=grpedTable1.parallelDo(..,CombineFn<K,T>,..);
> PGrpoupedTable<K,T> grpedTable2=table2.groupByKey();
> PTable<K,U> table3=grpedTable2.parallelDo(..,DoFn,...);
>
> So, which type of grpedTable2 or grpdTable1 will be used for reducers? My
> understanding is type of grpedTable2 will be used for reducers and type of
> grpedTable1 will be used for shuffle/sorting at map side. Otherwise, there
> will be no way send the Iterable data to reducers.
> If that is the case, then the point of not changing the type by CombineFn
> doesn't hold. Otherwise, not changing the type by CombineFn makes complete
> sense.
>

In this example, there would be two MapReduce jobs kicked off. The
first one would read in table, and then use a Combiner (based on the
CombineFn) before the reducer (i.e. before the groupByKey), and then
the same CombineFn within the reducer, to create table2.

Going from table2 would be another MapReduce job that would do nothing
in the mapper, and execute the supplied DoFn in the reducer.

> It will be awesome to have such functionality like Spark as Josh pointed
> out to overcome it in Crunch.

Just to be clear, adding the "Aggregatable" functionality in Crunch
won't actually add anything that isn't possible right now -- instead,
it will just wrap current functionality into a more readable unit (at
least that's how I see it).

- Gabriel


> Thanks,
> Chandan
>
>
>
> On Fri, Oct 18, 2013 at 7:34 PM, Josh Wills <jwills@cloudera.com> wrote:
>
>> I'm certainly not opposed to having something like this. Spark makes this
>> distinction via Accumulable vs. Accumulator:
>>
>>
>> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable
>>
>> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator
>>
>> Maybe we want something like "Aggregatable<R, T>" to go along with our
>> Aggregator<T> (which could extend Aggregatable<T, T>)?
>>
>>
>>
>> On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <gabriel.reid@gmail.com
>> >wrote:
>>
>> > This use case (map/combine <K,V> to <K,U>) seems to come up
>> > repeatedly. The solution (map <K,V> to <K, Collection<V>>
and then
>> > combine) works but is also pretty unintuitive.
>> >
>> > Any thoughts on adding a util in Crunch to do this? It would basically
>> > just need to be a static util method that takes a MapFn<<K,V><K,U>>
>> > and a CombineFn<K,U> and would take care of the singleton collection
>> > mapping stuff internally. On the one hand I'm thinking that this could
>> > be pretty useful, but I'm not sure if it would make things more
>> > intuitive or possibly have the reverse effect.
>> >
>> > Any opinions? I'm up for putting it together if people think it's worth
>> it.
>> >
>> > - Gabriel
>> >
>> >
>> > On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <mkwhit@gmail.com>
>> wrote:
>> > > Thinking about the technical issues at first glance you could say the
>> > > restriction is just the way the java generics are written for the
>> > CombineFn
>> > > class but if you think about what is actually happening it would be
>> > awkward
>> > > to support changing types in the CombineFn especially when it is paired
>> > > with a GroupByKey.  As I showed in the example the CombineFn
>> essentially
>> > > bookends the GBK operation by performing processing on the types before
>> > and
>> > > after the sorting.  The GBK's types describe the output of the map
>> phase
>> > > and input to the reduce.  If the CombineFn changed the types then the
>> > > output wouldn't match the types describe by the GBK.  I'm guessing this
>> > > could lead to a number of problems trying to compute the types and plan
>> > for
>> > > the job.
>> > >
>> > >
>> > > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <mkwhit@gmail.com>
>> > wrote:
>> > >
>> > >> I'm not sure I follow how there is extra effort involved.  Are you
>> > talking
>> > >> development effort or processing effort?  From a development effort
in
>> > both
>> > >> cases you need to write code that translates T to U and combines the
>> > >> values.  The difference is whether that logic exists inside of a
>> single
>> > >> DoFn or is split into a MapFn + CombineFn.  So the development effort
>> > >> should be the same.
>> > >>
>> > >>
>> > >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <
>> cbiswas1983@gmail.com
>> > >wrote:
>> > >>
>> > >>> yeah.. i see what you are talking about. But it will take extra
>> effort
>> > to
>> > >>> convert to U type. So, is there any specific reason the way CombineFn
>> > >>> created initially that CombineFn will not allow other return type.
>> Was
>> > >>> there any constraints (design / complexity) to restrict to this
>> > behavior?
>> > >>> Thanks,
>> > >>>
>> > >>>
>> > >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <mkwhit@gmail.com>
>> > wrote:
>> > >>>
>> > >>> > Chandan,
>> > >>> >    So let's apply your situation to the types and conversion
that
>> is
>> > >>> > proposed and break it down where logic will be applied.  Say
we
>> have
>> > a
>> > >>> > PCollection that is like the following:
>> > >>> >
>> > >>> > Mapper 1:
>> > >>> > <id1, "Hello">
>> > >>> > <id2, "World">
>> > >>> > <id1, "I like turtles">
>> > >>> >
>> > >>> > Mapper 2
>> > >>> > <id2, "Goodbye">
>> > >>> >
>> > >>> > This will be represented by the PTable<String, Comment>.
 We then
>> > apply
>> > >>> a
>> > >>> > MapFn to transform it into PTable<String, Book> and
we'd get the
>> > >>> following
>> > >>> > in our PCollection:
>> > >>> >
>> > >>> > Mapper 1
>> > >>> > <id1, <"Hello", 1>>
>> > >>> > <id2, <"World", 1>>
>> > >>> > <id1, <"I like turtles", 1>>
>> > >>> >
>> > >>> > Mapper 2
>> > >>> > <id2, <"Goodbye", 1>>
>> > >>> >
>> > >>> > Then if we were to use the GBK + CombineFn, the output of
the map
>> > phase
>> > >>> > would be..
>> > >>> >
>> > >>> > Mapper 1
>> > >>> > <id2, <"World", 1>>
>> > >>> > <id1, <"I like turtles", 2>>
>> > >>> >
>> > >>> > Mapper 2
>> > >>> > <id2, <"Goodbye", 1>>
>> > >>> >
>> > >>> > Notice Mapper 1 would only be emitting 2 items instead of
3 and
>> > >>> therefore
>> > >>> > less data is sent over the wire and has to be sorted.  Also
in the
>> > >>> reducer
>> > >>> > after the GBK is completed the CombineFn would finish its
work and
>> > you'd
>> > >>> > get the following:
>> > >>> >
>> > >>> > Reducer 1
>> > >>> > <id2, <"Goodbye", 2>>
>> > >>> > <id1, <"I like turtles", 2>>
>> > >>> >
>> > >>> > The only case where this would not improve performance is
if you
>> > never
>> > >>> emit
>> > >>> > data for the same key from the same mapper or your mapper
doesn't
>> > reduce
>> > >>> > the size of the data.
>> > >>> >
>> > >>> >
>> > >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas <
>> > cbiswas1983@gmail.com
>> > >>> > >wrote:
>> > >>> >
>> > >>> > > I have PTable<String,Comment>. and getting after
reduce
>> > PTable<String,
>> > >>> > > Book>
>> > >>> > >
>> > >>> > > T--> Comment{ String comment, String author}, U-->
Book{String
>> id,
>> > >>> String
>> > >>> > > lengthiestComment, int noOfComments}
>> > >>> > >
>> > >>> > > But wanted to some aggregations in the map side based
on some
>> logic
>> > >>> > instead
>> > >>> > > of all aggregations at reduce side.
>> > >>> > > Yes in worst case, data flow over the n/w will remain
same, but
>> > >>> sorting
>> > >>> > > will be improved.
>> > >>> > >
>> > >>> > > Thanks,
>> > >>> > > Chandan
>> > >>> > >
>> > >>> > >
>> > >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <jwills@cloudera.com
>> >
>> > >>> wrote:
>> > >>> > >
>> > >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas
<
>> > >>> cbiswas1983@gmail.com
>> > >>> > > > >wrote:
>> > >>> > > >
>> > >>> > > > > Yeah, I agree with Micah that it will not eliminate
the
>> reduce
>> > >>> phase
>> > >>> > > > > entirely. But the dummy object of U suggested
by Josh (or
>> > >>> converting
>> > >>> > > to U
>> > >>> > > > > type in map for every record)  will not improve
performance
>> > >>> because
>> > >>> > > same
>> > >>> > > > > amounts of records will be sorted and aggregated
in the
>> reduce
>> > >>> phase.
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > I don't think that's true-- the records of type
U will be
>> > combined
>> > >>> on
>> > >>> > the
>> > >>> > > > map-side, which would reduce the amount of data
that is pushed
>> > over
>> > >>> the
>> > >>> > > > network and improve performance.
>> > >>> > > >
>> > >>> > > > Can you give any additional details about what T
and U are in
>> > this
>> > >>> > > > scenario? :)
>> > >>> > > >
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > > But
>> > >>> > > > > my point is, can we improve it by applying
a combiner where
>> the
>> > >>> > > combineFn
>> > >>> > > > > provides output as different type. If we have
same type, we
>> can
>> > >>> use
>> > >>> > the
>> > >>> > > > > combiner to do some aggregation in map side
which improves
>> > >>> > performance.
>> > >>> > > > > But, can we have some mechanism by which the
same advantage
>> > can be
>> > >>> > > > achieved
>> > >>> > > > > when combineFn emits different type. I think,
emitting same
>> > type
>> > >>> by
>> > >>> > > > > CombineFn has restricted its use. Can we have
new CombineFn
>> > that
>> > >>> > allows
>> > >>> > > > us
>> > >>> > > > > to output different type not only same type
as input?
>> > >>> > > > >
>> > >>> > > > >
>> > >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills
<
>> > jwills@cloudera.com>
>> > >>> > > wrote:
>> > >>> > > > >
>> > >>> > > > > > Yeah, my experience in these kinds of
situations is that
>> you
>> > >>> need
>> > >>> > to
>> > >>> > > > come
>> > >>> > > > > > up with a "dummy" or singleton version
of U for the case
>> > where
>> > >>> > there
>> > >>> > > is
>> > >>> > > > > > only a single T and do that conversion
on the map side of
>> the
>> > >>> job,
>> > >>> > > > before
>> > >>> > > > > > the combiner runs. I think Chao had an
issue like this
>> awhile
>> > >>> ago,
>> > >>> > > > where
>> > >>> > > > > he
>> > >>> > > > > > had a PTable<String, Double> and
wanted to write a combiner
>> > that
>> > >>> > > would
>> > >>> > > > > > return a PTable<String, Collection<Double>>.
The solution
>> > was to
>> > >>> > > > convert
>> > >>> > > > > > the map-side object to a PTable<String,
>> Collection<Double>>,
>> > >>> where
>> > >>> > > the
>> > >>> > > > > > value on the map-side was a singleton
list containing just
>> > that
>> > >>> > > double
>> > >>> > > > > > value. Does that sort of trick work here?
>> > >>> > > > > >
>> > >>> > > > > >
>> > >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah
Whitacre <
>> > >>> mkwhit@gmail.com>
>> > >>> > > > > wrote:
>> > >>> > > > > >
>> > >>> > > > > > > Ok so the feature you are trying
to achieve is the
>> > proactive
>> > >>> > > > > combination
>> > >>> > > > > > of
>> > >>> > > > > > > data before performing the GBK like
the javadoc
>> describes.
>> > >>> > > >  Essentially
>> > >>> > > > > > in
>> > >>> > > > > > > that situation the CombineFn is being
used as a
>> > Combiner[1] to
>> > >>> > > > combine
>> > >>> > > > > > the
>> > >>> > > > > > > data local to that mapper before
doing the GBK and then
>> > >>> further
>> > >>> > > > > combining
>> > >>> > > > > > > the data in the reduce operation.
 It will not
>> necessarily
>> > >>> > > eliminate
>> > >>> > > > > the
>> > >>> > > > > > > need for all processing in the reduce.
>> > >>> > > > > > >
>> > >>> > > > > > > If you want to use this functionality
you will need to do
>> > the
>> > >>> > > > > following:
>> > >>> > > > > > >
>> > >>> > > > > > > PTable<S, T> map to PTable<S,
U>
>> > >>> > > > > > > PTable<S, U> gbk to PGT<S,
U>
>> > >>> > > > > > > PGT<S, U> combine PTable<S,
U>
>> > >>> > > > > > >
>> > >>> > > > > > > This will take advantage of any optimization
provided by
>> > the
>> > >>> > > > CombineFn.
>> > >>> > > > > > >
>> > >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM,
Chandan Biswas <
>> > >>> > > > cbiswas1983@gmail.com
>> > >>> > > > > > > >wrote:
>> > >>> > > > > > >
>> > >>> > > > > > > > Hello Micah,
>> > >>> > > > > > > > Yes we are using MapFn now.
That aggregation and
>> > >>> computation is
>> > >>> > > > being
>> > >>> > > > > > > done
>> > >>> > > > > > > > in reduce phase. As CombineFn
after GBK runs into map
>> > side,
>> > >>> > then
>> > >>> > > > > those
>> > >>> > > > > > > most
>> > >>> > > > > > > > computations can be done in
map side which are now
>> > running
>> > >>> in
>> > >>> > > > reduce
>> > >>> > > > > > > phase.
>> > >>> > > > > > > > Some smaller aggregations and
computations can be done
>> on
>> > >>> > reduce
>> > >>> > > > > phase.
>> > >>> > > > > > > > My point was to do some aggregation
(and create a new
>> > >>> object)
>> > >>> > in
>> > >>> > > > map
>> > >>> > > > > > > phase
>> > >>> > > > > > > > instead of in reduce phase.
>> > >>> > > > > > > >
>> > >>> > > > > > > > Thanks,
>> > >>> > > > > > > > Chandan
>> > >>> > > > > > > >
>> > >>> > > > > > > >
>> > >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48
PM, Micah Whitacre <
>> > >>> > > mkwhit@gmail.com>
>> > >>> > > > > > > wrote:
>> > >>> > > > > > > >
>> > >>> > > > > > > > > Chandan,
>> > >>> > > > > > > > >    I think what you are
wanting will just be a simple
>> > >>> MapFn
>> > >>> > > > instead
>> > >>> > > > > > of
>> > >>> > > > > > > a
>> > >>> > > > > > > > > CombineFn.  The doc of
the CombineFn[1] sounds like
>> > what
>> > >>> you
>> > >>> > > want
>> > >>> > > > > > with
>> > >>> > > > > > > > the
>> > >>> > > > > > > > > statement "A special
>> > >>> > > > > > > > > DoFn<
>> > >>> > > > > > >
>> > >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html
>> > >>> > > >
>> > >>> > > > > > > > > implementation
>> > >>> > > > > > > > > that converts an
>> > >>> > > > > > > > > Iterable<
>> > >>> > > > > > > > >
>> > >>> > > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> >
>> http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true
>> > >>> > > > > > > > > >
>> > >>> > > > > > > > > of
>> > >>> > > > > > > > > values into a single value"
but it is expecting the
>> > value
>> > >>> to
>> > >>> > be
>> > >>> > > > of
>> > >>> > > > > > the
>> > >>> > > > > > > > same
>> > >>> > > > > > > > > time.  Since you are wanting
to combine the values
>> > into a
>> > >>> > > > different
>> > >>> > > > > > > form
>> > >>> > > > > > > > it
>> > >>> > > > > > > > > should be fairly trivial
to write a MapFn that
>> converts
>> > >>> the
>> > >>> > > > > > Iterable<T>
>> > >>> > > > > > > > ->
>> > >>> > > > > > > > > U.
>> > >>> > > > > > > > >
>> > >>> > > > > > > > > [1] -
>> > >>> > > > > > > > >
>> > >>> > > > > > >
>> > >>> > > > >
>> > >>> > >
>> > >>>
>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html
>> > >>> > > > > > > > >
>> > >>> > > > > > > > >
>> > >>> > > > > > > > > On Thu, Oct 17, 2013 at
3:30 PM, Chandan Biswas <
>> > >>> > > > > > cbiswas1983@gmail.com
>> > >>> > > > > > > > > >wrote:
>> > >>> > > > > > > > >
>> > >>> > > > > > > > > > I was trying to refactoring
some stuffs and trying
>> to
>> > >>> use
>> > >>> > > > > > combineFn.
>> > >>> > > > > > > > > > But when I went into
deeper, found that I can't do
>> > it as
>> > >>> > > Crunch
>> > >>> > > > > > > doesn't
>> > >>> > > > > > > > > > allow it the functionality
I needed. For example, I
>> > >>> have a
>> > >>> > > > > > > > > > PGroupedTable<S,T>.
I wanted to apply
>> CombineFn<S,T>
>> > on
>> > >>> it
>> > >>> > > and
>> > >>> > > > > > wanted
>> > >>> > > > > > > > to
>> > >>> > > > > > > > > > get PCollection<S,U>
instead of T. Right now,
>> > CombineFn
>> > >>> > > allows
>> > >>> > > > > only
>> > >>> > > > > > > > same
>> > >>> > > > > > > > > > type as return value.
The use case of this need is
>> > that
>> > >>> > there
>> > >>> > > > > will
>> > >>> > > > > > be
>> > >>> > > > > > > > > some
>> > >>> > > > > > > > > > time saving in sorting.
It's natural that when
>> > >>> aggregating
>> > >>> > > some
>> > >>> > > > > > > objects
>> > >>> > > > > > > > > at
>> > >>> > > > > > > > > > map side can create
a new different type object.
>> > >>> > > > > > > > > >
>> > >>> > > > > > > > > > Any thought on it?
Am I missing any thing? If this
>> > can
>> > >>> be
>> > >>> > > > written
>> > >>> > > > > > in
>> > >>> > > > > > > > > > different way using
existing way please let me
>> know.
>> > >>> > > > > > > > > >
>> > >>> > > > > > > > > > Thanks
>> > >>> > > > > > > > > > Chandan
>> > >>> > > > > > > > > >
>> > >>> > > > > > > > >
>> > >>> > > > > > > >
>> > >>> > > > > > >
>> > >>> > > > > >
>> > >>> > > > > >
>> > >>> > > > > >
>> > >>> > > > > > --
>> > >>> > > > > > Director of Data Science
>> > >>> > > > > > Cloudera <http://www.cloudera.com>
>> > >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>> > >>> > > > > >
>> > >>> > > > >
>> > >>> > > >
>> > >>> > > >
>> > >>> > > >
>> > >>> > > > --
>> > >>> > > > Director of Data Science
>> > >>> > > > Cloudera <http://www.cloudera.com>
>> > >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>> > >>> > > >
>> > >>> > >
>> > >>> >
>> > >>>
>> > >>
>> > >>
>> >
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>

Mime
View raw message