beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Luke Cwik <>
Subject Re: Composable DoFn IOs Connection Reuse
Date Thu, 02 Jul 2020 03:53:32 GMT
I see, the splitting of state shards is related to splitting of splittable

On Tue, Jun 30, 2020 at 3:36 PM Kenneth Knowles <> wrote:

> I agree at the level of GroupIntoBatches. The part that is similar is if
> you implement GroupIntoBatches with a new primitive supporting
> runner-scoped state where the state is per-shard and the runner is able to
> split state shards.
> Kenn
> On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik <> wrote:
>> I'm not sure that runner determined sharding/GroupIntoBatches applies to
>> splittable DoFns. Splittable DoFns is about taking one element and having a
>> high fan-out / high cost function broken down to smaller pieces while
>> runner determined sharding/GroupIntoBatches is about taking a lot of small
>> elements and doing something with all of them together at once (e.g.
>> service call).
>> On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen <> wrote:
>>> --
>>> Best regards,
>>> Siyuan
>>> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles <> wrote:
>>>> Great doc.
>>>> The extended API makes sense. I like how it removes a knob. The
>>>> question that I have to ask is whether there is a core model change here
>>>> can we avoid it. Defining "shard" as a scope of state within which
>>>> execution is observably serial, today the model has key+window sharding
>>>> always. It seems like two options mentioned are:
>>>>  - new primitive way for runner to inject shard keys, so standard
>>>> stateful ParDo works
>>>>  - new primitive stateful ParDo with runner-defined scope of state that
>>>> is not observable to users
>>>> Seems on the doc that the latter is favored. I like it better, too. I
>>>> do not know how dynamic resharding would work for either of these. With
>>>> dynamic resharding the latter seems related to SDF.
>>> The runner-side implementation can be the same for both cases. One way
>>> would be to generate a shard number for a given input key when the data is
>>> emitted by an upstream transform and passed back to the runner, and then
>>> the runner can distribute the data to the corresponding shard for
>>> processing GroupIntoBatches. It's dynamic in that the data with the same
>>> key emitted at different times can be tagged with different shard numbers.
>>> SDF is mainly for sources AFAIU so is different (not familiar with that so
>>> correct me if I am wrong :p).
>>>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch {
>>>> maxSize = <n> }" allows any segmentation of the PCollection into batches
>>>> size <= n. The runner can replace it with the proposed implementation.
>>>> what does the default implementation look like? If the default
>>>> implementation is not good (like, say, "reshuffle") then maybe figuring out
>>>> the primitive and adding it is better than leaving a transform that is sort
>>>> of a hack when not replaced.
>>> It's a good point that we should think more carefully when extending the
>>> API. The default implementation of `ofSize()` or other similar limits like
>>> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` sounds
>>> similar to `ofSize()` since ofSize() guarantees emitting a fixed number of
>>> elements whenever possible and emitting a partial result otherwise.)
>>> Enabling runner determined sharding on a runner that does not support
>>> dynamic sharding would fall back to the current (default) implementation
>>> so no behavioral changes than today. Allowing the transform to accept
>>> unkeyed input might have to use a naive sharding though, e.g., paring each
>>> element with a random key.
>>>> Kenn
>>>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik <> wrote:
>>>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton <>
>>>>> wrote:
>>>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excited
>>>>>> to see this feature, particularly the PCollection<T> variant
that would
>>>>>> have been useful for the Cloud AI transforms recently introduced.
>>>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton <>
>>>>>> wrote:
>>>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen <>
>>>>>>> wrote:
>>>>>>>> Thanks Luke!
>>>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team.
We are
>>>>>>>> faced with similar issues with some sinks commonly used by
Dataflow such as
>>>>>>>> the streaming BigQuery sink. Basically the elements are grouped
and batched
>>>>>>>> so some following operations (potentially expensive) can
be performed at
>>>>>>>> once on a batch of elements. One problem with the grouping
is that it could
>>>>>>>> impose a limit on the parallelism of the DoFn performing
those operations.
>>>>>>>> To mitigate the limited parallelism problem, recently I have
been looking
>>>>>>>> into the idea of improving the `GroupIntoBatches` transform
to allow the
>>>>>>>> grouping to be dynamically sharded and therefore distributed
- essentially
>>>>>>>> a "shardable" stateful DoFn. The transform already does grouping
>>>>>>>> batching on the input KV - grouping on K and batching on
V - and it could
>>>>>>>> be extended to be able to shard the key and do batching within
each shard
>>>>>>>> (meaning that we would have a sharded form of keys somewhere).
The idea is
>>>>>>>> detailed in
>>>>>>>> Along with the proposal, there are two points I would like
to ask
>>>>>>>> for advice:
>>>>>>>> - Would there be cases where the sharded keys need to be
visible to
>>>>>>>> users? One case where that might be needed would be to apply
>>>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches,
so the
>>>>>>>> semantics of key-to-user state mapping is respected.
>>>>>>> Does exposing an API for the sharded keys change the implementation
>>>>>>> of the feature? If it is only an API change I think it would
be best to
>>>>>>> avoid exposing the keys to start with to avoid any unnecessary
>>>>>>> on the implementation. It seems like it could make it more difficult
>>>>>>> modify the sharding implementation in the future unnecessarily
at this
>>>>>>> point.
>>>>> Exposing the shard id makes it such that the shard id must cross the
>>>>> portability APIs during execution. If we don't expose it then it could
>>>>> implemented completely within the runner and all the SDK has to do is
>>>>> that this stateful transform supports sharding or this transform is the
>>>>> GroupIntoBatches transform.
>>>>>> - Would there be a need to have a per element shard id or per bundle
>>>>>>>> shard id would just be sufficient? The former is more general
and we could
>>>>>>>> still have the same shard id for all elements in a bundle.
But the
>>>>>>>> conclusion would potentially affect the way of implementation
(like how the
>>>>>>>> sharding information should be passed across FnAPI for example).
>>>>>>> Are you referring to an API for a pipeline author to get the
>>>>>>> id? I thought that a bundle isn't a pipeline author abstraction
but an
>>>>>>> implementation detail, I may be wrong in this since I'm not too
>>>>>>> with this area of code. In the proposal it looks like the shard
id isn't
>>>>>>> exposed, I prefer this, as I'm not sure there is any value for
the user in
>>>>>>> having a specific 'shard id'. Is there?
>>>>> A bundle is exposed to a pipeline author since they regularly want to
>>>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle
>>>>> amortize  setup/teardown at the bundle level.
>>>>> Exposing the shard id is the big question and is there a use case for
>>>>> it. I hope the community can provide guidance here otherwise I'm with
>>>>> and also agree that exposing it isn't necessary.
>>>>>>>> I'm very new to Beam so looking forward to hearing the thoughts
>>>>>>>> from the community. Any comments will be appreciated :)
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Siyuan
>>>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik <>
>>>>>>>>> My first idea was to use a connection pool that is shared
>>>>>>>>> the entire worker across multiple bundles. The connection
pool would TTL
>>>>>>>>> connections that have been unused. This would help a
bunch up until you hit
>>>>>>>>> the problem where you don't want every worker connected
to every resource
>>>>>>>>> because of sharding of the work. In this case we should
really be making
>>>>>>>>> sure that workers that have processed the same "key"
process the same "key"
>>>>>>>>> again without limiting the number of workers that can
process a specific
>>>>>>>>> key. This is very similar to what we do with a stateful
DoFn but one where
>>>>>>>>> the runner knows that it can "shard" the key. +Siyuan
>>>>>>>>> <> has been investigating something
like this
>>>>>>>>> for Dataflow to solve scalability issues with the BigQuery
sink and has
>>>>>>>>> been looking into how a better GroupIntoBatches and/or
sharded stateful
>>>>>>>>> DoFn could really help in these situations. This applies
in general to lots
>>>>>>>>> of things where we want to co-locate things with the
same key but not limit
>>>>>>>>> the parallel processing to only a single worker like
stateful DoFn does
>>>>>>>>> today.
>>>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía <>
>>>>>>>>> wrote:
>>>>>>>>>> We have been promoting the use of DoFn to write IO
connectors for
>>>>>>>>>> many reasons
>>>>>>>>>> including better composability. A common pattern
that arrives in
>>>>>>>>>> such IOs is
>>>>>>>>>> that a preceding transform prepares the specification
element on
>>>>>>>>>> split that a
>>>>>>>>>> subsequent DoFn uses to read the data. You can see
an example of
>>>>>>>>>> this on FileIO
>>>>>>>>>> [1] or in RedisIO [2]
>>>>>>>>>> The issue is that if we process that spec in the
>>>>>>>>>> `@ProcessElement` method we
>>>>>>>>>> lose the DoFn lifecycle because we cannot establish
a connection
>>>>>>>>>> on `@Setup` and
>>>>>>>>>> close it in `@Teardown` because the spec is per element,
so we
>>>>>>>>>> end up re
>>>>>>>>>> creating connections which is a quite costly operation
in some
>>>>>>>>>> systems like
>>>>>>>>>> Cassandra/HBase/etc and that it could end up saturating
the data
>>>>>>>>>> store because
>>>>>>>>>> of the massive creation of connections (something
that already
>>>>>>>>>> happened in the
>>>>>>>>>> past with JdbcIO in the streaming case).
>>>>>>>>>> In the ongoing PR that transforms Cassandra to be
DoFn based [3]
>>>>>>>>>> this subject
>>>>>>>>>> appeared again, and we were discussing how to eventually
>>>>>>>>>> connections,
>>>>>>>>>> maybe by a pretty naive approach of saving a previous
>>>>>>>>>> (or set of
>>>>>>>>>> identified connections) statically so it can be reused
>>>>>>>>>> multiple DoFns
>>>>>>>>>> instances. We already had some issues in the past
because of
>>>>>>>>>> creating many
>>>>>>>>>> connections on other IOs (JdbcIO) with streaming
pipelines where
>>>>>>>>>> databases were
>>>>>>>>>> swamped by massive amounts of connections, so reusing
>>>>>>>>>> seems to be
>>>>>>>>>> something that matters, but at the moment we do not
have a clear
>>>>>>>>>> way to do this
>>>>>>>>>> better.
>>>>>>>>>> Anyone have better ideas or recommendations for this
>>>>>>>>>> Thanks in advance.
>>>>>>>>>> Ismaël
>>>>>>>>>> [1]
>>>>>>>>>> [2]
>>>>>>>>>> [3]

View raw message