spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "assaf.mendelson" <>
Date Tue, 27 Sep 2016 14:02:13 GMT

I wanted to try to implement
So I started by looking at the implementation of collect_list. My idea was, do the same as
they but when adding a new element, if there are already more than the threshold, remove one
The problem with this is that since collect_list has no partial aggregation we would end up
shuffling all the data anyway. So while it would mean the actual resulting column might be
smaller, the whole process would be as expensive as collect_list.
So I thought of adding partial aggregation. The problem is that the merge function receives
a buffer which is in a row format. Collect_list doesn't use the buffer and uses its own data
structure for collecting the data.
I can change the implementation to use a spark ArrayType instead, however, since ArrayType
is immutable it would mean that I would need to copy it whenever I do anything.
Consider the simplest implementation of the update function:
If there are few elements => add an element to the array (if I use regular Array this would
mean copy as I grow it which is fine for this stage)
If there are enough elements => we do not grow the array. Instead we need to decide what
to replace. If we want to have the top 10 for example and there are 10 elements, we need to
drop the lowest and put the new one.
This means that if we simply loop across the array we would create a new copy and pay the
copy + loop. If we keep it sorted then adding, sorting and removing the low one means 3 copies.
If I would have been able to use scala's array then I would basically copy whenever I grow
and then when we grown to the max, all I would need to do is REPLACE the relevant element
which is much cheaper.

The only other solution I see is to simply provide "take first N" agg function and have the
user sort beforehand but this seems a bad solution to me both because sort is expensive and
because if we do multiple aggregations we can't sort in two different ways.

I can't find a way to convert an internal buffer the way collect_list does it to an internal
buffer before the merge.
I also can't find any way to use an array in the internal buffer as a mutable array. If I
look at GenericMutableRow implementation then updating an array means creating a new one.
I thought maybe of adding a function update_array_element which would change the relevant
element (and similarly get_array_element to get an array element) which would allow to easily
make the array mutable but if I look at the documentation it states this is not allowed.

Can anyone give me a tip on where to try to go from here?

View this message in context:
Sent from the Apache Spark Developers List mailing list archive at
View raw message