From dev-return-9532-archive-asf-public=cust-asf.ponee.io@beam.apache.org Fri May 4 18:49:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D3DE5180634 for ; Fri, 4 May 2018 18:49:03 +0200 (CEST) Received: (qmail 25438 invoked by uid 500); 4 May 2018 16:49:02 -0000 Mailing-List: contact dev-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list dev@beam.apache.org Received: (qmail 25420 invoked by uid 99); 4 May 2018 16:49:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 May 2018 16:49:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 91DC8C05EC for ; Fri, 4 May 2018 16:49:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.189 X-Spam-Level: * X-Spam-Status: No, score=1.189 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ogv1ksPhTZw7 for ; Fri, 4 May 2018 16:48:58 +0000 (UTC) Received: from mail-it0-f50.google.com (mail-it0-f50.google.com [209.85.214.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 033A85F1B4 for ; Fri, 4 May 2018 16:48:58 +0000 (UTC) Received: by mail-it0-f50.google.com with SMTP id j186-v6so4152118ita.5 for ; Fri, 04 May 2018 09:48:57 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=wkTQBqEABHdfdvBLXaTbTZ4LnSlUzNOB5+7t5dgcs7c=; b=ONWtGrKDpq+JQg2u21SvVT67l5VhCh68VCJ3Z/ZjYp49xPmsHQOg3QJLURhqw1i8oq sXQDY+WnUm9OqPpiFs8fIn7NYeCIfSR3X5SRiZE9zJy+DY61TEBSXaa/vjDal1FEITH2 9+F2mCIVbmHrPAYZZc2ets9U5hDKhXInifOv/umkF7NwhIqsY6R39wImvG3p9bIgC31x dpBBSqqq9b7ok3jmuHa4sr9y6GDeE2GkES4MgvmDZ2UovTPME29mu76Ri1K3EFyhX3Vl HWTNZCDb3xgfZux1PKncI+SkFTvs3hj/5h1fWLE3ZkxdCfPiVO2xXfAMTGVtmtqVBOwr 251g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=wkTQBqEABHdfdvBLXaTbTZ4LnSlUzNOB5+7t5dgcs7c=; b=fSMnR0TmWavis/9Tr/mQMzG1VITLlJZ15zFlGVNssW/ogHa2ZYcuEzsJDe/8ec2c2g RvdSNeYU58kCoAcn7nLLunWXrM3Z/IQxXekL+tEAttbVCIMgIcYNmyL4QVu5oJFbqWAo h9F4dE3BF8AzBlmTMZsLnmrlyy8QToGoykgSCCQdsJnJuhF85aSepBuP1jlrRHZrIBRM Qeki02tiZozj2JfULIiHixXoLJvKhrXKaSXIworYB+lIUdf1k/QpZ7y6QA7Rsk867zRA Ks90V+iBnRZYDd0/CGQUqSmOPF3pmuVOkqBesi9Ff8g1uI9B4QR1R3k+4nsGunhH2r13 HQTg== X-Gm-Message-State: ALQs6tB7QLmC2+iq1/YFXKt3QCcWkqTkRmvn5kbhkJTuSbeJblqg/xq7 zGNSK9CP5/00r9yXOvj8Z3pjMHRYuG4a5sBjbgfSBqKBa+c= X-Google-Smtp-Source: AB8JxZrm2aN3ArvqGedSm28Ck6+TPipKfguGW9jNRPTcc2gyaCg0Mmhvky8jH2p+s+gJlo7mI9NjNc1Xyp4hQAWjVsA= X-Received: by 2002:a24:e9c1:: with SMTP id f184-v6mr29204410ith.79.1525452536915; Fri, 04 May 2018 09:48:56 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Chamikara Jayalath Date: Fri, 04 May 2018 16:48:45 +0000 Message-ID: Subject: Re: I want to allow a user-specified QuerySplitter for DatastoreIO To: dev@beam.apache.org Content-Type: multipart/alternative; boundary="00000000000096386b056b64188b" --00000000000096386b056b64188b Content-Type: text/plain; charset="UTF-8" Hi Frank, On Thu, May 3, 2018 at 1:07 PM Lukasz Cwik wrote: > I also like the idea of doing the splitting when the pipeline is running > and not during pipeline construction. This works a lot better with things > like templates. > > Do you know what Maven package contains com.google.rpc classes and what is > the transitive dependency tree of the package? > > If those dependencies are already exposed (or not complex) then adding > com.google.rpc to the API surface whitelist will be a non-issue. > > On Thu, May 3, 2018 at 8:28 AM Frank Yellin wrote: > >> I actually tried (1), and ran precisely into the size limit that you >> mentioned. Because of the size of the database, I needed to split it into >> a few hundred shards, and that was more than the request limit. >> > Have you tried adding a Reshuffle transform after reading from Datastore ? Even if you have fewer number of initial shards, reshuffle could significantly help by allowing further parallelize the next steps. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L65 > >> I was also considering a slightly different alternative to (2), such as >> adding setQueries(), or setSplitterPTransform(). The semantics would be >> identical to that of your ReadAll, but I'd be able to reuse more of the >> code that is there. This gave me interesting results, but it wasn't as >> powerful as what I needed. See (2) below. >> >> Could you explain how these would be semantically equivalent to ReadAll ? With the ReadAll transform the flow would be somthing like following. pipeline.apply(ParDo(MyDoFnThatSplitsQueries())).apply(DatastoreIO.ReadAll()). 'MyDoFnThatSplitsQueries' would be your custom DoFn that performs splitting (to as many splits as you want). > The two specific use cases that were motivating me were that I needed to >> write code that could >> (1) delete a property from all Entitys whose creationTime is >> between one month and two months ago.. >> (2) delete all Entitys whose creationTime is more than two years >> ago. >> I think these are common-enough operations. For a very large database, >> it would be nice to be able to open read the small piece of it that is >> needed for your operation. >> >> Have you considered adding a filter ParDo that follows the read ? I understand that this would increase the amount of data that you read but I still prefer not allowing users to customize splitting due to serious issues I previously mentioned. Regarding deletion, I don't think source is the right place for that. We provide a separate transform for deletion. Can you try to use that ? https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L1009 > The first is easy to handle. I know the start and end of creationTime, >> and I can shard it myself. The second requires me to consult the datastore >> to find out what the smallest creationTime is in the datastore, and then >> use it as a[n] (advisory not hard,) lower limit; the query splitter should >> work well whether the oldest records were four years old or barely more >> than two years old. For this to be possible, I need access to the >> Datastore object, and this Datastore object needs to be passed as some sort >> of user callback. The QuerySplitter hook already existed and seemed to fit >> my needs perfectly. >> >> Is there a better alternative that still gives me access to the Datastore? >> >> >> >> >> >> >> >> On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath >> wrote: >> >>> Thanks. IMHO it might be better to perform this splitting as a part of >>> your pipeline instead of making source splitting customizable. The reason >>> is, it's easy for users to shoot themselves on the foot if we allow >>> specifying a custom splitter. A bug in a custom QuerySplitter can result in >>> a hard to catch data loss or data duplication bug. So I'd rather not make >>> it a part of the user API. >>> >>> I can think of two ways for performing this splitting as a part of your >>> pipeline. >>> (1) Split the query during job construction and create a source per >>> query. This can be followed by a Flatten transform that creates a single >>> PCollection. (Once caveat is, you might run into 10MB request size limit if >>> you create two many splits here. So try reducing the number of splits if >>> you ran into this). >>> (2) Add a ReadAll transform to DatastoreIO. This will allow you to >>> precede the step that performs reading by a ParDo step that splits your >>> query and create a PCollection of queries. You should not run into size >>> limits here since splitting happens in the data plane. >>> >>> Thanks, >>> Cham >>> >>> On Wed, May 2, 2018 at 12:50 PM Frank Yellin wrote: >>> >>>> TLDR: >>>> Is it okay for me to expose Datastore in apache beam's DatastoreIO, >>>> and thus indirectly expose com.google.rpc.Code? >>>> Is there a better solution? >>>> >>>> >>>> As I explain in Beam 4186 >>>> , I would like to be >>>> able to extend DatastoreV1.Read to have a >>>> withQuerySplitter(QuerrySplitter querySplitter) >>>> method, which would use an alternative query splitter. The standard >>>> one shards by key and is very limited. >>>> >>>> I have already written such a query splitter. In fact, the query >>>> splitter I've written goes further than specified in the beam, and reads >>>> the minimum or maximum value of the field from the datastore if no minimum >>>> or maximum is specified in the query, and uses that value for the >>>> sharding. I can write: >>>> SELECT * FROM ledger where type = 'purchase' >>>> and then ask it to shard on the eventTime, and it will shard nicely! >>>> I am working with the Datastore folks to separately add my new query >>>> splitter as an option in DatastoreHelper. >>>> >>>> >>>> I have already written the code to add withQuerySplitter. >>>> >>>> https://github.com/apache/beam/pull/5246 >>>> >>>> However the problem is that I am increasing the "surface API" of >>>> Dataflow. >>>> QuerySplitter exposes Datastore exposes DatastoreException >>>> exposes com.google.rpc.Code >>>> and com.google.rpc.Code is not (yet) part of the API surface. >>>> >>>> As a solution, I've added package com.google.rpc to the list of classes >>>> exposed. This package contains protobuf enums. Is this okay? Is there a >>>> better solution? >>>> >>>> Thanks. >>>> >>>> >> --00000000000096386b056b64188b Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Frank,

On Thu, May 3, 2018 at 1:07 PM Lukasz Cwik <lcwik@google.com> wrote:
I also like the idea of doing t= he splitting when the pipeline is running and not during pipeline construct= ion. This works a lot better with things like templates.

Do you know what Maven package contains com.google.rpc classes and what is= the transitive dependency tree of the package?

If= those dependencies are already exposed (or not complex) then adding com.go= ogle.rpc to the API surface whitelist will be a non-issue.

<= div class=3D"gmail_quote">
On Thu, May 3, 2018 at 8:28 AM F= rank Yellin <fy@fyel= lin.com> wrote:
I actually tried (1), and= ran precisely into the size limit that you mentioned.=C2=A0 Because of the= size of the database, I needed to split it into a few hundred shards, and = that was more than the request limit.

Have you tried adding a Reshuffle transform afte= r reading from Datastore ? Even if you have fewer number of initial shards,= reshuffle could significantly help by allowing further parallelize the nex= t steps.

https://github.com/apache/beam/blob/master/sdks/java/core= /src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L65
=C2=A0
<= div class=3D"gmail_quote">

I was also considering a slightly differen= t alternative to (2), such as=C2=A0 adding setQueries(), or setSplitterPTra= nsform().=C2=A0 The semantics would be identical to that of your ReadAll, b= ut I'd be able to reuse more of the code that is there.=C2=A0 This gave= me interesting results, but it wasn't as powerful as what I needed.=C2= =A0 See (2) below.=C2=A0=C2=A0


Could = you explain how these would be semantically equivalent to ReadAll ? With th= e ReadAll transform the flow would be somthing like following.
pipeline.apply(ParDo(MyDoFnThatSplitsQueries())).apply(Datasto= reIO.ReadAll()).

'MyDoFnThatSplitsQueries' would be your custom DoFn tha= t performs splitting (to as many splits as you want).

=
=C2=A0
=
The two specific use cases that were motivati= ng me were that I needed to write code that could
=C2=A0 =C2=A0 =C2=A0 (1) delete a property from all Ent= itys whose creationTime is between one month and two months ago..
=C2=A0 =C2=A0 =C2=A0 (2) delete all Ent= itys whose creationTime is more than two years ago.
I think these are common-enough operations.=C2=A0 For= a very large database, it would be nice to be able to open read the small = piece of it that is needed for your operation.


<= /div>
Have you considered adding a filter ParDo that follows the read ?= I understand that this would increase the amount of data that you read but= I still prefer not allowing users to customize splitting due to serious is= sues I previously mentioned. Regarding deletion, I don't think source i= s the right place for that. We provide a separate transform for deletion. C= an you try to use that ?

https://git= hub.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main= /java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L1009

=C2=A0
The first is easy to handle.=C2=A0= I know the start and end of creationTime, and I can shard it myself.=C2=A0= The second requires me to consult the datastore to find out what the small= est creationTime is in the datastore, and then use it as a[n] (advisory=C2= =A0 not hard,) lower limit; the query splitter should work well whether the= oldest records were four years old or barely more than two years old.=C2= =A0 =C2=A0For this to be possible, I need access to the Datastore object, a= nd this Datastore object needs to be passed as some sort of user callback.= =C2=A0 The QuerySplitter hook already existed and seemed to fit my needs pe= rfectly.

Is there a better alternative that still give= s me access to the Datastore?
=


=



On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath = <chamikara@google.com> wrote:
Thanks. IMHO it might be be= tter to perform this splitting as a part of your pipeline instead of making= source splitting customizable. The reason is, it's easy for users to s= hoot themselves on the foot if we allow specifying a custom splitter. A bug= in a custom QuerySplitter can result in a hard to catch data loss or data = duplication bug. So I'd rather not make it a part of the user API.

I can think of two ways for performing this splitting = as a part of your pipeline.
(1) Split the query during job constr= uction and create a source per query. This can be followed by a Flatten tra= nsform that creates a single PCollection. (Once caveat is, you might run in= to 10MB request size limit if you create two many splits here. So try reduc= ing the number of splits if you ran into this).
(2) Add a ReadAll= transform to DatastoreIO. This will allow you to precede the step that per= forms reading by a ParDo step that splits your query and create a PCollecti= on of queries. You should not run into size limits here since splitting hap= pens in the data plane.

Thanks,
Cham

On Wed, May 2, 2018 at 12:50 PM Frank Yellin <fy@fyellin.com> wrote:
TLDR= :=C2=A0
Is it okay for me to expose=C2=A0Datastore=C2=A0in apache beam's=C2=A0<= /span>DatastoreIO, and thus indi= rectly expose=C2=A0com.goo= gle.rpc.Code?
Is there a better solution?


As I explain in=C2=A0Be= am 4186, I would like to be able to extend=C2=A0DatastoreV1.Read=C2=A0to have a
=C2= =A0 =C2=A0 =C2=A0 =C2=A0withQuerySplitter(QuerrySplitter querySplitter)
method, which would use an alternative query splitter.=C2=A0 =C2= =A0The standard one shards by key and is very limited.

I hav= e already written such a query splitter.=C2=A0 In fact, the query splitter = I've written goes further than specified in the beam, and reads the min= imum or maximum value of the field from the datastore if no minimum or maxi= mum is specified in the query, and uses that value for the sharding.=C2=A0 = =C2=A0I can write:
=C2=A0 =C2=A0 = =C2=A0 =C2=A0SELECT * FROM ledger where type =3D 'purchase'<= /div>
and then ask it to shard on the=C2=A0eventTime, and it will shard nicely!=C2=A0 I am work= ing with the Datastore folks to separately add my new query splitter as an = option in=C2=A0DatastoreHe= lper.=C2=A0


I have already written the code t= o add=C2=A0withQuerySplitt= er.=C2=A0=C2=A0

=C2=A0 =C2=A0 =C2=A0 =C2=A0https://github.com/apache/beam/pull/5246
<= /div>

However the problem is that I am increasing the &q= uot;surface API" of Dataflow.=C2=A0=C2=A0
=C2=A0 =C2=A0 =C2= =A0 =C2=A0QuerySplitter=C2= =A0exposes=C2=A0Datastore= =C2=A0 exposes=C2=A0DatastoreException<= /font>=C2=A0 exposes=C2=A0com.google.rp= c.Code
and=C2=A0com.google.rpc.Code=C2=A0is not (yet) part of th= e API surface.

As a solution, I've added packa= ge com.google.rpc to the list of classes exposed.=C2=A0 This package contai= ns protobuf enums.=C2=A0 Is this okay?=C2=A0 Is there a better solution?

Thanks.


--00000000000096386b056b64188b--