beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chamikara Jayalath <chamik...@google.com>
Subject Re: I want to allow a user-specified QuerySplitter for DatastoreIO
Date Fri, 04 May 2018 16:48:45 GMT
Hi Frank,

On Thu, May 3, 2018 at 1:07 PM Lukasz Cwik <lcwik@google.com> wrote:

> I also like the idea of doing the splitting when the pipeline is running
> and not during pipeline construction. This works a lot better with things
> like templates.
>
> Do you know what Maven package contains com.google.rpc classes and what is
> the transitive dependency tree of the package?
>
> If those dependencies are already exposed (or not complex) then adding
> com.google.rpc to the API surface whitelist will be a non-issue.
>
> On Thu, May 3, 2018 at 8:28 AM Frank Yellin <fy@fyellin.com> wrote:
>
>> I actually tried (1), and ran precisely into the size limit that you
>> mentioned.  Because of the size of the database, I needed to split it into
>> a few hundred shards, and that was more than the request limit.
>>
>
Have you tried adding a Reshuffle transform after reading from Datastore ?
Even if you have fewer number of initial shards, reshuffle could
significantly help by allowing further parallelize the next steps.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L65


>
>> I was also considering a slightly different alternative to (2), such as
>> adding setQueries(), or setSplitterPTransform().  The semantics would be
>> identical to that of your ReadAll, but I'd be able to reuse more of the
>> code that is there.  This gave me interesting results, but it wasn't as
>> powerful as what I needed.  See (2) below.
>>
>>
Could you explain how these would be semantically equivalent to ReadAll ?
With the ReadAll transform the flow would be somthing like following.

pipeline.apply(ParDo(MyDoFnThatSplitsQueries())).apply(DatastoreIO.ReadAll()).

'MyDoFnThatSplitsQueries' would be your custom DoFn that performs splitting
(to as many splits as you want).



> The two specific use cases that were motivating me were that I needed to
>> write code that could
>>       (1) delete a property from all Entitys whose creationTime is
>> between one month and two months ago..
>>       (2) delete all Entitys whose creationTime is more than two years
>> ago.
>> I think these are common-enough operations.  For a very large database,
>> it would be nice to be able to open read the small piece of it that is
>> needed for your operation.
>>
>>
Have you considered adding a filter ParDo that follows the read ? I
understand that this would increase the amount of data that you read but I
still prefer not allowing users to customize splitting due to serious
issues I previously mentioned. Regarding deletion, I don't think source is
the right place for that. We provide a separate transform for deletion. Can
you try to use that ?

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L1009



> The first is easy to handle.  I know the start and end of creationTime,
>> and I can shard it myself.  The second requires me to consult the datastore
>> to find out what the smallest creationTime is in the datastore, and then
>> use it as a[n] (advisory  not hard,) lower limit; the query splitter should
>> work well whether the oldest records were four years old or barely more
>> than two years old.   For this to be possible, I need access to the
>> Datastore object, and this Datastore object needs to be passed as some sort
>> of user callback.  The QuerySplitter hook already existed and seemed to fit
>> my needs perfectly.
>>
>> Is there a better alternative that still gives me access to the Datastore?
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath <chamikara@google.com>
>> wrote:
>>
>>> Thanks. IMHO it might be better to perform this splitting as a part of
>>> your pipeline instead of making source splitting customizable. The reason
>>> is, it's easy for users to shoot themselves on the foot if we allow
>>> specifying a custom splitter. A bug in a custom QuerySplitter can result in
>>> a hard to catch data loss or data duplication bug. So I'd rather not make
>>> it a part of the user API.
>>>
>>> I can think of two ways for performing this splitting as a part of your
>>> pipeline.
>>> (1) Split the query during job construction and create a source per
>>> query. This can be followed by a Flatten transform that creates a single
>>> PCollection. (Once caveat is, you might run into 10MB request size limit if
>>> you create two many splits here. So try reducing the number of splits if
>>> you ran into this).
>>> (2) Add a ReadAll transform to DatastoreIO. This will allow you to
>>> precede the step that performs reading by a ParDo step that splits your
>>> query and create a PCollection of queries. You should not run into size
>>> limits here since splitting happens in the data plane.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, May 2, 2018 at 12:50 PM Frank Yellin <fy@fyellin.com> wrote:
>>>
>>>> TLDR:
>>>> Is it okay for me to expose Datastore in apache beam's DatastoreIO,
>>>> and thus indirectly expose com.google.rpc.Code?
>>>> Is there a better solution?
>>>>
>>>>
>>>> As I explain in Beam 4186
>>>> <https://issues.apache.org/jira/browse/BEAM-4186>, I would like to
be
>>>> able to extend DatastoreV1.Read to have a
>>>>        withQuerySplitter(QuerrySplitter querySplitter)
>>>> method, which would use an alternative query splitter.   The standard
>>>> one shards by key and is very limited.
>>>>
>>>> I have already written such a query splitter.  In fact, the query
>>>> splitter I've written goes further than specified in the beam, and reads
>>>> the minimum or maximum value of the field from the datastore if no minimum
>>>> or maximum is specified in the query, and uses that value for the
>>>> sharding.   I can write:
>>>>        SELECT * FROM ledger where type = 'purchase'
>>>> and then ask it to shard on the eventTime, and it will shard nicely!
>>>> I am working with the Datastore folks to separately add my new query
>>>> splitter as an option in DatastoreHelper.
>>>>
>>>>
>>>> I have already written the code to add withQuerySplitter.
>>>>
>>>>        https://github.com/apache/beam/pull/5246
>>>>
>>>> However the problem is that I am increasing the "surface API" of
>>>> Dataflow.
>>>>        QuerySplitter exposes Datastore  exposes DatastoreException
>>>> exposes com.google.rpc.Code
>>>> and com.google.rpc.Code is not (yet) part of the API surface.
>>>>
>>>> As a solution, I've added package com.google.rpc to the list of classes
>>>> exposed.  This package contains protobuf enums.  Is this okay?  Is there
a
>>>> better solution?
>>>>
>>>> Thanks.
>>>>
>>>>
>>

Mime
View raw message