beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <>
Subject Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type
Date Fri, 28 Oct 2016 16:06:06 GMT
For it to be considered a combiner, the function needs to be associative
and commutative.

The issue is that from an API perspective it would be easy to have a
Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
people in the data processing world expect that this
parallelization/optimization is performed and thus exposing such a method
would be dangerous as it would be breaking users expectations so from the
design perspective it is a hard requirement. If PCollections ever become
ordered or gain other properties, these requirements may loosen but it
seems unlikely in the short term.

At this point, I think your looking for a MapElements which you pass in a
SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
OutputT> should be trivial.

On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <> wrote:

> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with Combine.perKey(
> SerializableFunction<Iterable<InputT>, OutputT>). It's not difficult to
> provide an underlying CombineFn.
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <>
> wrote:
>> Combine.perKey takes a single SerializableFunction which knows how to
>> convert from Iterable<V> to V.
>> It turns out that many runners implement optimizations which allow them to
>> run the combine operation across several machines to parallelize the work
>> and potentially reduce the amount of data they store during a GBK.
>> To be able to do such an optimization, it requires you to actually have
>> three functions:
>> InputT -> AccumulatorT : Creates the intermediate representation which
>> allows for associative combining
>> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
>> AccumT -> OutputT: Extracts the output
>> In the case of Combine.perKey with a SerializableFunction, your providing
>> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
>> identity functions.
>> To be able to support a Combine.perKey which can go from Iterable<InputT>
>> -> OutputT would require that this occurred within a single machine
>> removing the parallelization benefits that runners provide and for almost
>> all cases is not a good idea.
>> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <>
>> wrote:
>> > Hi all,
>> >
>> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
>> > and
>> > output to be of the same type while `Combine.PerKey` doesn't have this
>> > restriction.
>> >
>> > Thanks,
>> > Manu
>> >

View raw message