From dev-return-23988-archive-asf-public=cust-asf.ponee.io@beam.apache.org Thu Jul 2 03:53:52 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 31B38180638 for ; Thu, 2 Jul 2020 05:53:52 +0200 (CEST) Received: (qmail 36558 invoked by uid 500); 2 Jul 2020 03:53:50 -0000 Mailing-List: contact dev-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list dev@beam.apache.org Received: (qmail 36541 invoked by uid 99); 2 Jul 2020 03:53:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2020 03:53:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 59C31181496 for ; Thu, 2 Jul 2020 03:53:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -15.491 X-Spam-Level: X-Spam-Status: No, score=-15.491 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, ENV_AND_HDR_SPF_MATCH=-0.5, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, USER_IN_DEF_DKIM_WL=-7.5, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id U44OzlG1ho0s for ; Thu, 2 Jul 2020 03:53:46 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::144; helo=mail-il1-x144.google.com; envelope-from=lcwik@google.com; receiver= Received: from mail-il1-x144.google.com (mail-il1-x144.google.com [IPv6:2607:f8b0:4864:20::144]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id F09127F5CF for ; Thu, 2 Jul 2020 03:53:45 +0000 (UTC) Received: by mail-il1-x144.google.com with SMTP id i18so23013869ilk.10 for ; Wed, 01 Jul 2020 20:53:45 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=rqRc9Qrv0DhgOdkc8enslZI501V5Q2JgUIml/ntX1Pg=; b=ow/ZqE7Gscar86iNRCcBthkKgsRqF5CbGn7n8nZlJbATDaBsBc7ArMws/E9wWRRGcV Uq9qaATIS+bj0XBGeSNPucwBb4IV0Ch2NUDvjgHyFgrQ3a87Qv61ugWXqBfOXSpRntxR zZKFLQ3TIQhxyJasxCOTvmaYWVrDI2MgsS6ymnAG21ZUSqUq1evhizwTMoc581fESrvp MqFoXyTQC3qKptsmhhcNZ2yqC1TszFP0jvloB0aPw/cvtEtUbIs8o2Y0fYds9MQvOSS6 g3jIdIoz6jqdUOPcNkbOW8hz7cQnITXNSipwENSTjflfg/Ugtoi1w+uD4VVJzPeFs/YM j9Ag== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=rqRc9Qrv0DhgOdkc8enslZI501V5Q2JgUIml/ntX1Pg=; b=dkuuTznPKi3ZDh4wHlWMzMO2D2lJ7jUYc5vj5QHlftu7M4xmNlOqwjMZJVx4J98Q1P YtItQc4lvasqScWQYHCXKtu8CFvGUIFTaXM/KMigGkv+20tn5fQE7kKqLUlDQLaW691V tyTFDzWEBBD9NeBGxlesuiBmkbVxkexQ/r2APfemNSW9i642jh2IA7y0J/0R/JZTauI1 n6wSuCW3v+HaqvbdHhIq/8eq+/iktRm8RI54wABaDbIk3zpkvISPXu8YUroOS/jWGC/1 vVevTRRXrfRHjooms/eWXCRhj1U1c4mKRmpgdp1lGf36koong8Hn9vcnlbzidRzvgudm 6x6Q== X-Gm-Message-State: AOAM530u9W762+cp7XehpRjebbRn+fudLNFaDNnRQ8WKUvxVfzEM6GUM 6ZKRBDb1ZrEhMoC+waqpoteKSCDdn2GJtzhc9Hseh0m4 X-Google-Smtp-Source: ABdhPJxy25CCJBgVLaz5p0R3sHcPOPSn3k9lBUT7It104s/uSmIFTCBFFpYCyv+sXXn4Rrhog5QoZzqVDGo2BUh87X0= X-Received: by 2002:a92:9196:: with SMTP id e22mr10515909ill.147.1593662024309; Wed, 01 Jul 2020 20:53:44 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Luke Cwik Date: Wed, 1 Jul 2020 20:53:32 -0700 Message-ID: Subject: Re: Composable DoFn IOs Connection Reuse To: dev Content-Type: multipart/alternative; boundary="000000000000da2ef605a96d5a10" --000000000000da2ef605a96d5a10 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable I see, the splitting of state shards is related to splitting of splittable DoFns. On Tue, Jun 30, 2020 at 3:36 PM Kenneth Knowles wrote: > I agree at the level of GroupIntoBatches. The part that is similar is if > you implement GroupIntoBatches with a new primitive supporting > runner-scoped state where the state is per-shard and the runner is able t= o > split state shards. > > Kenn > > On Tue, Jun 30, 2020 at 9:22 AM Luke Cwik wrote: > >> I'm not sure that runner determined sharding/GroupIntoBatches applies to >> splittable DoFns. Splittable DoFns is about taking one element and havin= g a >> high fan-out / high cost function broken down to smaller pieces while >> runner determined sharding/GroupIntoBatches is about taking a lot of sma= ll >> elements and doing something with all of them together at once (e.g. >> service call). >> >> >> On Mon, Jun 29, 2020 at 3:00 PM Siyuan Chen wrote: >> >>> >>> -- >>> Best regards, >>> Siyuan >>> >>> >>> On Mon, Jun 29, 2020 at 1:06 PM Kenneth Knowles wrote= : >>> >>>> Great doc. >>>> >>>> The extended API makes sense. I like how it removes a knob. The >>>> question that I have to ask is whether there is a core model change he= re or >>>> can we avoid it. Defining "shard" as a scope of state within which >>>> execution is observably serial, today the model has key+window shardin= g >>>> always. It seems like two options mentioned are: >>>> >>>> - new primitive way for runner to inject shard keys, so standard >>>> stateful ParDo works >>>> - new primitive stateful ParDo with runner-defined scope of state tha= t >>>> is not observable to users >>>> >>>> Seems on the doc that the latter is favored. I like it better, too. I >>>> do not know how dynamic resharding would work for either of these. Wit= h >>>> dynamic resharding the latter seems related to SDF. >>>> >>> The runner-side implementation can be the same for both cases. One way >>> would be to generate a shard number for a given input key when the data= is >>> emitted by an upstream transform and passed back to the runner, and the= n >>> the runner can distribute the data to the corresponding shard for >>> processing GroupIntoBatches. It's dynamic in that the data with the sam= e >>> key emitted at different times can be tagged with different shard numbe= rs. >>> SDF is mainly for sources AFAIU so is different (not familiar with that= so >>> correct me if I am wrong :p). >>> >>>> >>>> Suppose we say that the semantics of the URN+payload "GroupIntoBatch { >>>> maxSize =3D }" allows any segmentation of the PCollection into bat= ches of >>>> size <=3D n. The runner can replace it with the proposed implementatio= n. But >>>> what does the default implementation look like? If the default >>>> implementation is not good (like, say, "reshuffle") then maybe figurin= g out >>>> the primitive and adding it is better than leaving a transform that is= sort >>>> of a hack when not replaced. >>>> >>> >>> It's a good point that we should think more carefully when extending th= e >>> API. The default implementation of `ofSize()` or other similar limits l= ike >>> `ofBytes()` would behave like a normal stateful DoFn. (`ofMaxSize` soun= ds >>> similar to `ofSize()` since ofSize() guarantees emitting a fixed number= of >>> elements whenever possible and emitting a partial result otherwise.) >>> Enabling runner determined sharding on a runner that does not support >>> dynamic sharding would fall back to the current (default) implementatio= n >>> so no behavioral changes than today. Allowing the transform to accept >>> unkeyed input might have to use a naive sharding though, e.g., paring e= ach >>> element with a random key. >>> >>>> >>>> Kenn >>>> >>>> On Mon, Jun 29, 2020 at 9:26 AM Luke Cwik wrote: >>>> >>>>> >>>>> >>>>> On Fri, Jun 26, 2020 at 3:45 PM Tyson Hamilton >>>>> wrote: >>>>> >>>>>> Nice doc by the way, it's concise. Thanks for sharing and I'm excite= d >>>>>> to see this feature, particularly the PCollection variant that wo= uld >>>>>> have been useful for the Cloud AI transforms recently introduced. >>>>>> >>>>>> On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Luke! >>>>>>>> >>>>>>>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are >>>>>>>> faced with similar issues with some sinks commonly used by Dataflo= w such as >>>>>>>> the streaming BigQuery sink. Basically the elements are grouped an= d batched >>>>>>>> so some following operations (potentially expensive) can be perfor= med at >>>>>>>> once on a batch of elements. One problem with the grouping is that= it could >>>>>>>> impose a limit on the parallelism of the DoFn performing those ope= rations. >>>>>>>> To mitigate the limited parallelism problem, recently I have been = looking >>>>>>>> into the idea of improving the `GroupIntoBatches` transform to all= ow the >>>>>>>> grouping to be dynamically sharded and therefore distributed - ess= entially >>>>>>>> a "shardable" stateful DoFn. The transform already does grouping a= nd >>>>>>>> batching on the input KV - grouping on K and batching on V - and i= t could >>>>>>>> be extended to be able to shard the key and do batching within eac= h shard >>>>>>>> (meaning that we would have a sharded form of keys somewhere). The= idea is >>>>>>>> detailed in https://s.apache.org/sharded-group-into-batches >>>>>>>> >>>>>>>> Along with the proposal, there are two points I would like to ask >>>>>>>> for advice: >>>>>>>> - Would there be cases where the sharded keys need to be visible t= o >>>>>>>> users? One case where that might be needed would be to apply anoth= er >>>>>>>> stateful DoFn to the sharded output of the GroupIntoBatches, so th= e >>>>>>>> semantics of key-to-user state mapping is respected. >>>>>>>> >>>>>>> >>>>>>> Does exposing an API for the sharded keys change the implementation >>>>>>> of the feature? If it is only an API change I think it would be bes= t to >>>>>>> avoid exposing the keys to start with to avoid any unnecessary depe= ndency >>>>>>> on the implementation. It seems like it could make it more difficul= t to >>>>>>> modify the sharding implementation in the future unnecessarily at t= his >>>>>>> point. >>>>>>> >>>>>> >>>>> Exposing the shard id makes it such that the shard id must cross the >>>>> portability APIs during execution. If we don't expose it then it coul= d be >>>>> implemented completely within the runner and all the SDK has to do is= say >>>>> that this stateful transform supports sharding or this transform is t= he >>>>> GroupIntoBatches transform. >>>>> >>>>> >>>>>> - Would there be a need to have a per element shard id or per bundle >>>>>>>> shard id would just be sufficient? The former is more general and = we could >>>>>>>> still have the same shard id for all elements in a bundle. But the >>>>>>>> conclusion would potentially affect the way of implementation (lik= e how the >>>>>>>> sharding information should be passed across FnAPI for example). >>>>>>>> >>>>>>>> >>>>>>> Are you referring to an API for a pipeline author to get the shard >>>>>>> id? I thought that a bundle isn't a pipeline author abstraction but= an >>>>>>> implementation detail, I may be wrong in this since I'm not too fam= iliar >>>>>>> with this area of code. In the proposal it looks like the shard id = isn't >>>>>>> exposed, I prefer this, as I'm not sure there is any value for the = user in >>>>>>> having a specific 'shard id'. Is there? >>>>>>> >>>>>> >>>>> A bundle is exposed to a pipeline author since they regularly want to >>>>> know the lifetime (e.g. startBundle/finishBundle on DoFn) of a bundle= to >>>>> amortize setup/teardown at the bundle level. >>>>> >>>>> Exposing the shard id is the big question and is there a use case for >>>>> it. I hope the community can provide guidance here otherwise I'm with= you >>>>> and also agree that exposing it isn't necessary. >>>>> >>>>> >>>>>> >>>>>>> >>>>>>>> I'm very new to Beam so looking forward to hearing the thoughts >>>>>>>> from the community. Any comments will be appreciated :) >>>>>>>> -- >>>>>>>> Best regards, >>>>>>>> Siyuan >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik wrote= : >>>>>>>> >>>>>>>>> My first idea was to use a connection pool that is shared across >>>>>>>>> the entire worker across multiple bundles. The connection pool wo= uld TTL >>>>>>>>> connections that have been unused. This would help a bunch up unt= il you hit >>>>>>>>> the problem where you don't want every worker connected to every = resource >>>>>>>>> because of sharding of the work. In this case we should really be= making >>>>>>>>> sure that workers that have processed the same "key" process the = same "key" >>>>>>>>> again without limiting the number of workers that can process a s= pecific >>>>>>>>> key. This is very similar to what we do with a stateful DoFn but = one where >>>>>>>>> the runner knows that it can "shard" the key. +Siyuan Chen >>>>>>>>> has been investigating something like this >>>>>>>>> for Dataflow to solve scalability issues with the BigQuery sink a= nd has >>>>>>>>> been looking into how a better GroupIntoBatches and/or sharded st= ateful >>>>>>>>> DoFn could really help in these situations. This applies in gener= al to lots >>>>>>>>> of things where we want to co-locate things with the same key but= not limit >>>>>>>>> the parallel processing to only a single worker like stateful DoF= n does >>>>>>>>> today. >>>>>>>>> >>>>>>>>> On Tue, Jun 16, 2020 at 2:44 PM Isma=C3=ABl Mej=C3=ADa >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> We have been promoting the use of DoFn to write IO connectors fo= r >>>>>>>>>> many reasons >>>>>>>>>> including better composability. A common pattern that arrives in >>>>>>>>>> such IOs is >>>>>>>>>> that a preceding transform prepares the specification element on >>>>>>>>>> split that a >>>>>>>>>> subsequent DoFn uses to read the data. You can see an example of >>>>>>>>>> this on FileIO >>>>>>>>>> [1] or in RedisIO [2] >>>>>>>>>> >>>>>>>>>> The issue is that if we process that spec in the >>>>>>>>>> `@ProcessElement` method we >>>>>>>>>> lose the DoFn lifecycle because we cannot establish a connection >>>>>>>>>> on `@Setup` and >>>>>>>>>> close it in `@Teardown` because the spec is per element, so we >>>>>>>>>> end up re >>>>>>>>>> creating connections which is a quite costly operation in some >>>>>>>>>> systems like >>>>>>>>>> Cassandra/HBase/etc and that it could end up saturating the data >>>>>>>>>> store because >>>>>>>>>> of the massive creation of connections (something that already >>>>>>>>>> happened in the >>>>>>>>>> past with JdbcIO in the streaming case). >>>>>>>>>> >>>>>>>>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] >>>>>>>>>> this subject >>>>>>>>>> appeared again, and we were discussing how to eventually reuse >>>>>>>>>> connections, >>>>>>>>>> maybe by a pretty naive approach of saving a previous connection >>>>>>>>>> (or set of >>>>>>>>>> identified connections) statically so it can be reused by >>>>>>>>>> multiple DoFns >>>>>>>>>> instances. We already had some issues in the past because of >>>>>>>>>> creating many >>>>>>>>>> connections on other IOs (JdbcIO) with streaming pipelines where >>>>>>>>>> databases were >>>>>>>>>> swamped by massive amounts of connections, so reusing connection= s >>>>>>>>>> seems to be >>>>>>>>>> something that matters, but at the moment we do not have a clear >>>>>>>>>> way to do this >>>>>>>>>> better. >>>>>>>>>> >>>>>>>>>> Anyone have better ideas or recommendations for this scenario? >>>>>>>>>> Thanks in advance. >>>>>>>>>> >>>>>>>>>> Isma=C3=ABl >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515b= d24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parq= uet/ParquetIO.java#L260 >>>>>>>>>> [2] >>>>>>>>>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515b= d24abc255eda/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/So= lrIO.java#L471 >>>>>>>>>> [3] https://github.com/apache/beam/pull/10546 >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> --000000000000da2ef605a96d5a10 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I see, the splitting of state shards is related to splitti= ng of splittable DoFns.

On Tue, Jun 30, 2020 at 3:36 PM Kenneth Knowles <= kenn@apache.org> wrote:
=
I a= gree at the level of GroupIntoBatches. The part that is similar is if you i= mplement GroupIntoBatches with a new primitive supporting runner-scoped sta= te where the state is per-shard and the runner is able to split state shard= s.

Kenn

=
On Tue, Jun 30, 2020 at 9:22 AM Luke = Cwik <lcwik@google= .com> wrote:
I'm not sure that runner determined sharding/= GroupIntoBatches applies to splittable DoFns. Splittable DoFns is about tak= ing one element and having a high fan-out / high cost function broken down = to smaller pieces while runner determined sharding/GroupIntoBatches is abou= t taking a lot of small elements and doing something with all of them toget= her at once (e.g. service call).


On Mon, Jun 29, 2020= at 3:00 PM Siyuan Chen <sychen@google.com> wrote:

--
Best regards,
Siyuan


On Mon, Jun 29, 2020 at 1:06 PM Kenneth Know= les <kenn@apache.or= g> wrote:
Great doc.

The extended API makes sens= e. I like how it removes a knob. The question that I have to ask is whether= there is a core model change here or can we avoid it. Defining "shard= " as a scope of state within which execution is observably serial, tod= ay the model has key+window sharding always. It seems like two options ment= ioned are:

=C2=A0- new primitive way for runner to= inject shard keys, so standard stateful ParDo works
=C2=A0- new = primitive stateful ParDo with runner-defined scope of state that is not obs= ervable to users

Seems on the doc that the lat= ter is favored. I like it better, too. I do not know how dynamic resharding= would work for either of these. With dynamic resharding the latter seems r= elated to SDF.
The runner-side implementation = can be the same for both cases. One way would be to generate a shard number= for a given input key when the data is emitted by an upstream transform an= d passed=C2=A0back to the runner,=C2=A0and then the runner can distribute= =C2=A0the data to the corresponding shard for processing GroupIntoBatches. = It's dynamic in=C2=A0that the data with the same key emitted at differe= nt times can be tagged with different shard numbers. SDF is mainly for sour= ces AFAIU so is different (not familiar with that so correct me if I am wro= ng :p).

Suppose we say that the semantics of the URN+payl= oad "GroupIntoBatch { maxSize =3D <n> }" allows any segment= ation of the PCollection into batches of size <=3D n. The runner can rep= lace it with the proposed implementation. But what does the default impleme= ntation look like? If the default implementation is not good (like, say, &q= uot;reshuffle") then maybe figuring out the primitive and adding it is= better than leaving a transform that is sort of a hack=C2=A0when not repla= ced.
=C2=A0
It's a good point th= at we should think more carefully when extending the API.=C2=A0The default = implementation of `ofSize()` or other similar limits like `ofBytes()` would= behave like a normal stateful DoFn. (`ofMaxSize` sounds similar to `ofSize= ()` since ofSize() guarantees emitting a fixed number of elements whenever = possible and emitting a partial result otherwise.) Enabling=C2=A0runner=C2= =A0determined sharding on a runner that does not support dynamic sharding w= ould fall back to the current (default) implementation so=C2=A0no behaviora= l=C2=A0changes than today. Allowing the transform to accept unkeyed input m= ight have to use a naive sharding though, e.g., paring each element with a = random key.

Kenn

On Mon, Jun 29, 2020 at 9:26 AM Luke= Cwik <lcwik@googl= e.com> wrote:


On Fri, Jun 26, 2020 at 3:45 PM = Tyson Hamilton <= tysonjh@google.com> wrote:
Nice doc by the way, it's concise. T= hanks for sharing and I'm excited to see this feature, particularly the= PCollection<T> variant that would have been useful for the Cloud AI = transforms recently introduced.

On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilt= on <tysonjh@goog= le.com> wrote:


On Fri, Jun 26, 2020 at 12:24 P= M Siyuan Chen <sy= chen@google.com> wrote:
Thanks Luke!

Hi, I'm Siyuan and I'm working on the Google Dataflow team. W= e are faced with similar issues with some sinks commonly used by Dataflow s= uch as the streaming BigQuery sink. Basically the elements are grouped and = batched so some following operations (potentially expensive) can be perform= ed at once on a batch of elements. One problem with the grouping is that it= could impose a limit on the parallelism of the DoFn performing those opera= tions. To mitigate the limited parallelism problem, recently I have been lo= oking into the idea of improving the `GroupIntoBatches` transform=C2=A0to a= llow the grouping to be dynamically sharded and therefore distributed - ess= entially a "shardable" stateful DoFn. The transform already does = grouping and batching on the input KV - grouping on K and batching on V - a= nd it could be extended to be able to shard the key and do batching within = each shard (meaning that we would have a sharded form of keys somewhere). T= he idea is detailed in=C2=A0https://s.apache.org/sharded-group-into-batches=

Along with the proposal, there are two points I = would like to ask for advice:
- Would there be cases where th= e=C2=A0sharded keys need to be visible to users? One case where that might = be needed would be to apply another stateful DoFn to the sharded output of = the GroupIntoBatches, so the semantics=C2=A0of key-to-user state mapping is= respected.

Does exposing an API for the=C2=A0sharded keys change= the implementation of the feature? If it is=C2=A0only an API=C2=A0change I= think it would be best to avoid exposing the keys to start with to avoid a= ny unnecessary dependency on the implementation. It seems like it could mak= e it more difficult to modify the sharding implementation in the future unn= ecessarily at this point.

Exposing the shard id makes it such that the= shard id must cross the portability APIs during execution. If we don't= expose it then it could be implemented completely within the runner and al= l the SDK has to do is say that this stateful transform supports sharding o= r this transform is the GroupIntoBatches transform.
=C2=A0
<= blockquote class=3D"gmail_quote" style=3D"margin:0px 0px 0px 0.8ex;border-l= eft:1px solid rgb(204,204,204);padding-left:1ex">
- Would there be a need to have a per element shard id or per bundle sha= rd id would just be sufficient? The former is more general and we could sti= ll have the same shard id for all elements in a bundle. But the conclusion = would potentially affect the way of implementation (like how the sharding i= nformation should be passed across FnAPI for example).

=

Are yo= u referring to an API for a pipeline author to get the shard id? I thought = that a bundle isn't=C2=A0a pipeline author abstraction but an implement= ation detail, I may be wrong in this since I'm not too familiar with th= is area of code. In the proposal it looks like the shard id isn't expos= ed, I prefer this, as I'm not sure there is any value for the user in h= aving a specific 'shard id'. Is there?
=C2=A0
A bundle is exposed to a pipelin= e author since they regularly want to know the lifetime (e.g. startBundle/f= inishBundle on DoFn) of a bundle to amortize=C2=A0 setup/teardown at the bu= ndle level.

Exposing the shard id is the big quest= ion and is there a use case for it.=C2=A0I hope the community can provide g= uidance here otherwise I'm with you and also agree that exposing it isn= 't necessary.
=C2=A0
=C2=A0
I'm very new to Beam so looking forward to hearing=C2=A0the tho= ughts from the community. Any comments will be appreciated :)
--
Best regards,
Siyuan


On Tue, Jun = 16, 2020 at 3:04 PM Luke Cwik <lcwik@google.com> wrote:
My first idea was to use a co= nnection pool that is shared across the entire worker across multiple bundl= es. The connection pool would TTL connections that have been unused. This w= ould help a bunch up until you hit the problem where you don't want eve= ry worker connected to every resource because of sharding of the work. In t= his case we should really be making sure that workers that have processed t= he same "key" process the same "key" again without limi= ting the number of workers that can process a specific key. This is very si= milar to what we do with a stateful DoFn but one where the runner knows tha= t it can "shard" the key.=C2=A0+Siyuan Chen=C2=A0has been= investigating something like this for Dataflow to solve scalability issues= with the BigQuery sink and has been looking into how a better GroupIntoBat= ches and/or sharded stateful DoFn could really help in these situations. Th= is applies in general to lots of things where we want to co-locate things w= ith the same key but not limit the parallel processing to only a single wor= ker like stateful DoFn does today.

On Tue, Jun 16, 2020 at 2:44 PM Isma=C3= =ABl Mej=C3=ADa <= iemejia@gmail.com> wrote:
We have been promoting the use of DoFn to write IO connect= ors for many reasons
including better composability. A common pattern that arrives in such IOs i= s
that a preceding transform prepares the specification element on split that= a
subsequent DoFn uses to read the data. You can see an example of this on Fi= leIO
[1] or in RedisIO [2]

The issue is that if we process that spec in the `@ProcessElement` method w= e
lose the DoFn lifecycle because we cannot establish a connection on `@Setup= ` and
close it in `@Teardown` because the spec is per element, so we end up re creating connections which is a quite costly operation in some systems like=
Cassandra/HBase/etc and that it could end up saturating the data store beca= use
of the massive creation of connections (something that already happened in = the
past with JdbcIO in the streaming case).

In the ongoing PR that transforms Cassandra to be DoFn based [3] this subje= ct
appeared again, and we were discussing how to eventually reuse connections,=
maybe by a pretty naive approach of saving a previous connection (or set of=
identified connections) statically so it can be reused by multiple DoFns instances. We already had some issues in the past because of creating many<= br> connections on other IOs (JdbcIO) with streaming pipelines where databases = were
swamped by massive amounts of connections, so reusing connections seems to = be
something that matters, but at the moment we do not have a clear way to do = this
better.

Anyone have better ideas or recommendations for this scenario?
Thanks in advance.

Isma=C3=ABl

[1] https://g= ithub.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/ja= va/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L= 260
[2] https://github.com= /apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/sol= r/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L471
[3] https://github.com/apache/beam/pull/10546
=C2=A0
--000000000000da2ef605a96d5a10--