crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: Process of CombineFn<S,T> returns <S,U>?
Date Sat, 19 Oct 2013 04:22:17 GMT
On Sat, Oct 19, 2013 at 2:34 AM, Josh Wills <jwills@cloudera.com> wrote:
> I'm certainly not opposed to having something like this. Spark makes this
> distinction via Accumulable vs. Accumulator:
>
> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulable
> http://spark.incubator.apache.org/docs/0.8.0/api/core/index.html#org.apache.spark.Accumulator
>
> Maybe we want something like "Aggregatable<R, T>" to go along with our
> Aggregator<T> (which could extend Aggregatable<T, T>)?
>
>

That sounds like a great way of exposing that functionality -- I'll
take a closer look at actually doing it.

- Gabriel

>
> On Fri, Oct 18, 2013 at 1:36 PM, Gabriel Reid <gabriel.reid@gmail.com>wrote:
>
>> This use case (map/combine <K,V> to <K,U>) seems to come up
>> repeatedly. The solution (map <K,V> to <K, Collection<V>> and then
>> combine) works but is also pretty unintuitive.
>>
>> Any thoughts on adding a util in Crunch to do this? It would basically
>> just need to be a static util method that takes a MapFn<<K,V><K,U>>
>> and a CombineFn<K,U> and would take care of the singleton collection
>> mapping stuff internally. On the one hand I'm thinking that this could
>> be pretty useful, but I'm not sure if it would make things more
>> intuitive or possibly have the reverse effect.
>>
>> Any opinions? I'm up for putting it together if people think it's worth it.
>>
>> - Gabriel
>>
>>
>> On Fri, Oct 18, 2013 at 4:14 PM, Micah Whitacre <mkwhit@gmail.com> wrote:
>> > Thinking about the technical issues at first glance you could say the
>> > restriction is just the way the java generics are written for the
>> CombineFn
>> > class but if you think about what is actually happening it would be
>> awkward
>> > to support changing types in the CombineFn especially when it is paired
>> > with a GroupByKey.  As I showed in the example the CombineFn essentially
>> > bookends the GBK operation by performing processing on the types before
>> and
>> > after the sorting.  The GBK's types describe the output of the map phase
>> > and input to the reduce.  If the CombineFn changed the types then the
>> > output wouldn't match the types describe by the GBK.  I'm guessing this
>> > could lead to a number of problems trying to compute the types and plan
>> for
>> > the job.
>> >
>> >
>> > On Fri, Oct 18, 2013 at 8:55 AM, Micah Whitacre <mkwhit@gmail.com>
>> wrote:
>> >
>> >> I'm not sure I follow how there is extra effort involved.  Are you
>> talking
>> >> development effort or processing effort?  From a development effort in
>> both
>> >> cases you need to write code that translates T to U and combines the
>> >> values.  The difference is whether that logic exists inside of a single
>> >> DoFn or is split into a MapFn + CombineFn.  So the development effort
>> >> should be the same.
>> >>
>> >>
>> >> On Fri, Oct 18, 2013 at 8:11 AM, Chandan Biswas <cbiswas1983@gmail.com
>> >wrote:
>> >>
>> >>> yeah.. i see what you are talking about. But it will take extra effort
>> to
>> >>> convert to U type. So, is there any specific reason the way CombineFn
>> >>> created initially that CombineFn will not allow other return type. Was
>> >>> there any constraints (design / complexity) to restrict to this
>> behavior?
>> >>> Thanks,
>> >>>
>> >>>
>> >>> On Thu, Oct 17, 2013 at 8:47 PM, Micah Whitacre <mkwhit@gmail.com>
>> wrote:
>> >>>
>> >>> > Chandan,
>> >>> >    So let's apply your situation to the types and conversion that
is
>> >>> > proposed and break it down where logic will be applied.  Say we
have
>> a
>> >>> > PCollection that is like the following:
>> >>> >
>> >>> > Mapper 1:
>> >>> > <id1, "Hello">
>> >>> > <id2, "World">
>> >>> > <id1, "I like turtles">
>> >>> >
>> >>> > Mapper 2
>> >>> > <id2, "Goodbye">
>> >>> >
>> >>> > This will be represented by the PTable<String, Comment>.
 We then
>> apply
>> >>> a
>> >>> > MapFn to transform it into PTable<String, Book> and we'd
get the
>> >>> following
>> >>> > in our PCollection:
>> >>> >
>> >>> > Mapper 1
>> >>> > <id1, <"Hello", 1>>
>> >>> > <id2, <"World", 1>>
>> >>> > <id1, <"I like turtles", 1>>
>> >>> >
>> >>> > Mapper 2
>> >>> > <id2, <"Goodbye", 1>>
>> >>> >
>> >>> > Then if we were to use the GBK + CombineFn, the output of the map
>> phase
>> >>> > would be..
>> >>> >
>> >>> > Mapper 1
>> >>> > <id2, <"World", 1>>
>> >>> > <id1, <"I like turtles", 2>>
>> >>> >
>> >>> > Mapper 2
>> >>> > <id2, <"Goodbye", 1>>
>> >>> >
>> >>> > Notice Mapper 1 would only be emitting 2 items instead of 3 and
>> >>> therefore
>> >>> > less data is sent over the wire and has to be sorted.  Also in
the
>> >>> reducer
>> >>> > after the GBK is completed the CombineFn would finish its work
and
>> you'd
>> >>> > get the following:
>> >>> >
>> >>> > Reducer 1
>> >>> > <id2, <"Goodbye", 2>>
>> >>> > <id1, <"I like turtles", 2>>
>> >>> >
>> >>> > The only case where this would not improve performance is if you
>> never
>> >>> emit
>> >>> > data for the same key from the same mapper or your mapper doesn't
>> reduce
>> >>> > the size of the data.
>> >>> >
>> >>> >
>> >>> > On Thu, Oct 17, 2013 at 8:18 PM, Chandan Biswas <
>> cbiswas1983@gmail.com
>> >>> > >wrote:
>> >>> >
>> >>> > > I have PTable<String,Comment>. and getting after reduce
>> PTable<String,
>> >>> > > Book>
>> >>> > >
>> >>> > > T--> Comment{ String comment, String author}, U--> Book{String
id,
>> >>> String
>> >>> > > lengthiestComment, int noOfComments}
>> >>> > >
>> >>> > > But wanted to some aggregations in the map side based on some
logic
>> >>> > instead
>> >>> > > of all aggregations at reduce side.
>> >>> > > Yes in worst case, data flow over the n/w will remain same,
but
>> >>> sorting
>> >>> > > will be improved.
>> >>> > >
>> >>> > > Thanks,
>> >>> > > Chandan
>> >>> > >
>> >>> > >
>> >>> > > On Thu, Oct 17, 2013 at 6:46 PM, Josh Wills <jwills@cloudera.com>
>> >>> wrote:
>> >>> > >
>> >>> > > > On Thu, Oct 17, 2013 at 4:41 PM, Chandan Biswas <
>> >>> cbiswas1983@gmail.com
>> >>> > > > >wrote:
>> >>> > > >
>> >>> > > > > Yeah, I agree with Micah that it will not eliminate
the reduce
>> >>> phase
>> >>> > > > > entirely. But the dummy object of U suggested by
Josh (or
>> >>> converting
>> >>> > > to U
>> >>> > > > > type in map for every record)  will not improve
performance
>> >>> because
>> >>> > > same
>> >>> > > > > amounts of records will be sorted and aggregated
in the reduce
>> >>> phase.
>> >>> > > >
>> >>> > > >
>> >>> > > > I don't think that's true-- the records of type U will
be
>> combined
>> >>> on
>> >>> > the
>> >>> > > > map-side, which would reduce the amount of data that
is pushed
>> over
>> >>> the
>> >>> > > > network and improve performance.
>> >>> > > >
>> >>> > > > Can you give any additional details about what T and
U are in
>> this
>> >>> > > > scenario? :)
>> >>> > > >
>> >>> > > >
>> >>> > > >
>> >>> > > > > But
>> >>> > > > > my point is, can we improve it by applying a combiner
where the
>> >>> > > combineFn
>> >>> > > > > provides output as different type. If we have same
type, we can
>> >>> use
>> >>> > the
>> >>> > > > > combiner to do some aggregation in map side which
improves
>> >>> > performance.
>> >>> > > > > But, can we have some mechanism by which the same
advantage
>> can be
>> >>> > > > achieved
>> >>> > > > > when combineFn emits different type. I think, emitting
same
>> type
>> >>> by
>> >>> > > > > CombineFn has restricted its use. Can we have new
CombineFn
>> that
>> >>> > allows
>> >>> > > > us
>> >>> > > > > to output different type not only same type as input?
>> >>> > > > >
>> >>> > > > >
>> >>> > > > > On Thu, Oct 17, 2013 at 5:05 PM, Josh Wills <
>> jwills@cloudera.com>
>> >>> > > wrote:
>> >>> > > > >
>> >>> > > > > > Yeah, my experience in these kinds of situations
is that you
>> >>> need
>> >>> > to
>> >>> > > > come
>> >>> > > > > > up with a "dummy" or singleton version of U
for the case
>> where
>> >>> > there
>> >>> > > is
>> >>> > > > > > only a single T and do that conversion on the
map side of the
>> >>> job,
>> >>> > > > before
>> >>> > > > > > the combiner runs. I think Chao had an issue
like this awhile
>> >>> ago,
>> >>> > > > where
>> >>> > > > > he
>> >>> > > > > > had a PTable<String, Double> and wanted
to write a combiner
>> that
>> >>> > > would
>> >>> > > > > > return a PTable<String, Collection<Double>>.
The solution
>> was to
>> >>> > > > convert
>> >>> > > > > > the map-side object to a PTable<String,
Collection<Double>>,
>> >>> where
>> >>> > > the
>> >>> > > > > > value on the map-side was a singleton list
containing just
>> that
>> >>> > > double
>> >>> > > > > > value. Does that sort of trick work here?
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > On Thu, Oct 17, 2013 at 2:57 PM, Micah Whitacre
<
>> >>> mkwhit@gmail.com>
>> >>> > > > > wrote:
>> >>> > > > > >
>> >>> > > > > > > Ok so the feature you are trying to achieve
is the
>> proactive
>> >>> > > > > combination
>> >>> > > > > > of
>> >>> > > > > > > data before performing the GBK like the
javadoc describes.
>> >>> > > >  Essentially
>> >>> > > > > > in
>> >>> > > > > > > that situation the CombineFn is being
used as a
>> Combiner[1] to
>> >>> > > > combine
>> >>> > > > > > the
>> >>> > > > > > > data local to that mapper before doing
the GBK and then
>> >>> further
>> >>> > > > > combining
>> >>> > > > > > > the data in the reduce operation.  It
will not necessarily
>> >>> > > eliminate
>> >>> > > > > the
>> >>> > > > > > > need for all processing in the reduce.
>> >>> > > > > > >
>> >>> > > > > > > If you want to use this functionality
you will need to do
>> the
>> >>> > > > > following:
>> >>> > > > > > >
>> >>> > > > > > > PTable<S, T> map to PTable<S,
U>
>> >>> > > > > > > PTable<S, U> gbk to PGT<S, U>
>> >>> > > > > > > PGT<S, U> combine PTable<S, U>
>> >>> > > > > > >
>> >>> > > > > > > This will take advantage of any optimization
provided by
>> the
>> >>> > > > CombineFn.
>> >>> > > > > > >
>> >>> > > > > > > [1] - http://wiki.apache.org/hadoop/HadoopMapReduce
>> >>> > > > > > >
>> >>> > > > > > >
>> >>> > > > > > >
>> >>> > > > > > > On Thu, Oct 17, 2013 at 4:30 PM, Chandan
Biswas <
>> >>> > > > cbiswas1983@gmail.com
>> >>> > > > > > > >wrote:
>> >>> > > > > > >
>> >>> > > > > > > > Hello Micah,
>> >>> > > > > > > > Yes we are using MapFn now. That
aggregation and
>> >>> computation is
>> >>> > > > being
>> >>> > > > > > > done
>> >>> > > > > > > > in reduce phase. As CombineFn after
GBK runs into map
>> side,
>> >>> > then
>> >>> > > > > those
>> >>> > > > > > > most
>> >>> > > > > > > > computations can be done in map side
which are now
>> running
>> >>> in
>> >>> > > > reduce
>> >>> > > > > > > phase.
>> >>> > > > > > > > Some smaller aggregations and computations
can be done on
>> >>> > reduce
>> >>> > > > > phase.
>> >>> > > > > > > > My point was to do some aggregation
(and create a new
>> >>> object)
>> >>> > in
>> >>> > > > map
>> >>> > > > > > > phase
>> >>> > > > > > > > instead of in reduce phase.
>> >>> > > > > > > >
>> >>> > > > > > > > Thanks,
>> >>> > > > > > > > Chandan
>> >>> > > > > > > >
>> >>> > > > > > > >
>> >>> > > > > > > > On Thu, Oct 17, 2013 at 3:48 PM,
Micah Whitacre <
>> >>> > > mkwhit@gmail.com>
>> >>> > > > > > > wrote:
>> >>> > > > > > > >
>> >>> > > > > > > > > Chandan,
>> >>> > > > > > > > >    I think what you are wanting
will just be a simple
>> >>> MapFn
>> >>> > > > instead
>> >>> > > > > > of
>> >>> > > > > > > a
>> >>> > > > > > > > > CombineFn.  The doc of the CombineFn[1]
sounds like
>> what
>> >>> you
>> >>> > > want
>> >>> > > > > > with
>> >>> > > > > > > > the
>> >>> > > > > > > > > statement "A special
>> >>> > > > > > > > > DoFn<
>> >>> > > > > > >
>> >>> > http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/DoFn.html
>> >>> > > >
>> >>> > > > > > > > > implementation
>> >>> > > > > > > > > that converts an
>> >>> > > > > > > > > Iterable<
>> >>> > > > > > > > >
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> http://download.oracle.com/javase/6/docs/api/java/lang/Iterable.html?is-external=true
>> >>> > > > > > > > > >
>> >>> > > > > > > > > of
>> >>> > > > > > > > > values into a single value"
but it is expecting the
>> value
>> >>> to
>> >>> > be
>> >>> > > > of
>> >>> > > > > > the
>> >>> > > > > > > > same
>> >>> > > > > > > > > time.  Since you are wanting
to combine the values
>> into a
>> >>> > > > different
>> >>> > > > > > > form
>> >>> > > > > > > > it
>> >>> > > > > > > > > should be fairly trivial to
write a MapFn that converts
>> >>> the
>> >>> > > > > > Iterable<T>
>> >>> > > > > > > > ->
>> >>> > > > > > > > > U.
>> >>> > > > > > > > >
>> >>> > > > > > > > > [1] -
>> >>> > > > > > > > >
>> >>> > > > > > >
>> >>> > > > >
>> >>> > >
>> >>>
>> http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/CombineFn.html
>> >>> > > > > > > > >
>> >>> > > > > > > > >
>> >>> > > > > > > > > On Thu, Oct 17, 2013 at 3:30
PM, Chandan Biswas <
>> >>> > > > > > cbiswas1983@gmail.com
>> >>> > > > > > > > > >wrote:
>> >>> > > > > > > > >
>> >>> > > > > > > > > > I was trying to refactoring
some stuffs and trying to
>> >>> use
>> >>> > > > > > combineFn.
>> >>> > > > > > > > > > But when I went into deeper,
found that I can't do
>> it as
>> >>> > > Crunch
>> >>> > > > > > > doesn't
>> >>> > > > > > > > > > allow it the functionality
I needed. For example, I
>> >>> have a
>> >>> > > > > > > > > > PGroupedTable<S,T>.
I wanted to apply CombineFn<S,T>
>> on
>> >>> it
>> >>> > > and
>> >>> > > > > > wanted
>> >>> > > > > > > > to
>> >>> > > > > > > > > > get PCollection<S,U>
instead of T. Right now,
>> CombineFn
>> >>> > > allows
>> >>> > > > > only
>> >>> > > > > > > > same
>> >>> > > > > > > > > > type as return value. The
use case of this need is
>> that
>> >>> > there
>> >>> > > > > will
>> >>> > > > > > be
>> >>> > > > > > > > > some
>> >>> > > > > > > > > > time saving in sorting.
It's natural that when
>> >>> aggregating
>> >>> > > some
>> >>> > > > > > > objects
>> >>> > > > > > > > > at
>> >>> > > > > > > > > > map side can create a new
different type object.
>> >>> > > > > > > > > >
>> >>> > > > > > > > > > Any thought on it? Am I
missing any thing? If this
>> can
>> >>> be
>> >>> > > > written
>> >>> > > > > > in
>> >>> > > > > > > > > > different way using existing
way please let me know.
>> >>> > > > > > > > > >
>> >>> > > > > > > > > > Thanks
>> >>> > > > > > > > > > Chandan
>> >>> > > > > > > > > >
>> >>> > > > > > > > >
>> >>> > > > > > > >
>> >>> > > > > > >
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > >
>> >>> > > > > > --
>> >>> > > > > > Director of Data Science
>> >>> > > > > > Cloudera <http://www.cloudera.com>
>> >>> > > > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>> >>> > > > > >
>> >>> > > > >
>> >>> > > >
>> >>> > > >
>> >>> > > >
>> >>> > > > --
>> >>> > > > Director of Data Science
>> >>> > > > Cloudera <http://www.cloudera.com>
>> >>> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
>> >>> > > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>> >>
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message