beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chamikara Jayalath <>
Subject Re: I want to allow a user-specified QuerySplitter for DatastoreIO
Date Fri, 04 May 2018 16:48:45 GMT
Hi Frank,

On Thu, May 3, 2018 at 1:07 PM Lukasz Cwik <> wrote:

> I also like the idea of doing the splitting when the pipeline is running
> and not during pipeline construction. This works a lot better with things
> like templates.
> Do you know what Maven package contains classes and what is
> the transitive dependency tree of the package?
> If those dependencies are already exposed (or not complex) then adding
> to the API surface whitelist will be a non-issue.
> On Thu, May 3, 2018 at 8:28 AM Frank Yellin <> wrote:
>> I actually tried (1), and ran precisely into the size limit that you
>> mentioned.  Because of the size of the database, I needed to split it into
>> a few hundred shards, and that was more than the request limit.
Have you tried adding a Reshuffle transform after reading from Datastore ?
Even if you have fewer number of initial shards, reshuffle could
significantly help by allowing further parallelize the next steps.

>> I was also considering a slightly different alternative to (2), such as
>> adding setQueries(), or setSplitterPTransform().  The semantics would be
>> identical to that of your ReadAll, but I'd be able to reuse more of the
>> code that is there.  This gave me interesting results, but it wasn't as
>> powerful as what I needed.  See (2) below.
Could you explain how these would be semantically equivalent to ReadAll ?
With the ReadAll transform the flow would be somthing like following.


'MyDoFnThatSplitsQueries' would be your custom DoFn that performs splitting
(to as many splits as you want).

> The two specific use cases that were motivating me were that I needed to
>> write code that could
>>       (1) delete a property from all Entitys whose creationTime is
>> between one month and two months ago..
>>       (2) delete all Entitys whose creationTime is more than two years
>> ago.
>> I think these are common-enough operations.  For a very large database,
>> it would be nice to be able to open read the small piece of it that is
>> needed for your operation.
Have you considered adding a filter ParDo that follows the read ? I
understand that this would increase the amount of data that you read but I
still prefer not allowing users to customize splitting due to serious
issues I previously mentioned. Regarding deletion, I don't think source is
the right place for that. We provide a separate transform for deletion. Can
you try to use that ?

> The first is easy to handle.  I know the start and end of creationTime,
>> and I can shard it myself.  The second requires me to consult the datastore
>> to find out what the smallest creationTime is in the datastore, and then
>> use it as a[n] (advisory  not hard,) lower limit; the query splitter should
>> work well whether the oldest records were four years old or barely more
>> than two years old.   For this to be possible, I need access to the
>> Datastore object, and this Datastore object needs to be passed as some sort
>> of user callback.  The QuerySplitter hook already existed and seemed to fit
>> my needs perfectly.
>> Is there a better alternative that still gives me access to the Datastore?
>> On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath <>
>> wrote:
>>> Thanks. IMHO it might be better to perform this splitting as a part of
>>> your pipeline instead of making source splitting customizable. The reason
>>> is, it's easy for users to shoot themselves on the foot if we allow
>>> specifying a custom splitter. A bug in a custom QuerySplitter can result in
>>> a hard to catch data loss or data duplication bug. So I'd rather not make
>>> it a part of the user API.
>>> I can think of two ways for performing this splitting as a part of your
>>> pipeline.
>>> (1) Split the query during job construction and create a source per
>>> query. This can be followed by a Flatten transform that creates a single
>>> PCollection. (Once caveat is, you might run into 10MB request size limit if
>>> you create two many splits here. So try reducing the number of splits if
>>> you ran into this).
>>> (2) Add a ReadAll transform to DatastoreIO. This will allow you to
>>> precede the step that performs reading by a ParDo step that splits your
>>> query and create a PCollection of queries. You should not run into size
>>> limits here since splitting happens in the data plane.
>>> Thanks,
>>> Cham
>>> On Wed, May 2, 2018 at 12:50 PM Frank Yellin <> wrote:
>>>> TLDR:
>>>> Is it okay for me to expose Datastore in apache beam's DatastoreIO,
>>>> and thus indirectly expose
>>>> Is there a better solution?
>>>> As I explain in Beam 4186
>>>> <>, I would like to
>>>> able to extend DatastoreV1.Read to have a
>>>>        withQuerySplitter(QuerrySplitter querySplitter)
>>>> method, which would use an alternative query splitter.   The standard
>>>> one shards by key and is very limited.
>>>> I have already written such a query splitter.  In fact, the query
>>>> splitter I've written goes further than specified in the beam, and reads
>>>> the minimum or maximum value of the field from the datastore if no minimum
>>>> or maximum is specified in the query, and uses that value for the
>>>> sharding.   I can write:
>>>>        SELECT * FROM ledger where type = 'purchase'
>>>> and then ask it to shard on the eventTime, and it will shard nicely!
>>>> I am working with the Datastore folks to separately add my new query
>>>> splitter as an option in DatastoreHelper.
>>>> I have already written the code to add withQuerySplitter.
>>>> However the problem is that I am increasing the "surface API" of
>>>> Dataflow.
>>>>        QuerySplitter exposes Datastore  exposes DatastoreException
>>>> exposes
>>>> and is not (yet) part of the API surface.
>>>> As a solution, I've added package to the list of classes
>>>> exposed.  This package contains protobuf enums.  Is this okay?  Is there
>>>> better solution?
>>>> Thanks.

View raw message