beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <>
Subject Re: I want to allow a user-specified QuerySplitter for DatastoreIO
Date Thu, 03 May 2018 20:07:18 GMT
I also like the idea of doing the splitting when the pipeline is running
and not during pipeline construction. This works a lot better with things
like templates.

Do you know what Maven package contains classes and what is
the transitive dependency tree of the package?

If those dependencies are already exposed (or not complex) then adding to the API surface whitelist will be a non-issue.

On Thu, May 3, 2018 at 8:28 AM Frank Yellin <> wrote:

> I actually tried (1), and ran precisely into the size limit that you
> mentioned.  Because of the size of the database, I needed to split it into
> a few hundred shards, and that was more than the request limit.
> I was also considering a slightly different alternative to (2), such as
> adding setQueries(), or setSplitterPTransform().  The semantics would be
> identical to that of your ReadAll, but I'd be able to reuse more of the
> code that is there.  This gave me interesting results, but it wasn't as
> powerful as what I needed.  See (2) below.
> The two specific use cases that were motivating me were that I needed to
> write code that could
>       (1) delete a property from all Entitys whose creationTime is between
> one month and two months ago..
>       (2) delete all Entitys whose creationTime is more than two years ago.
> I think these are common-enough operations.  For a very large database, it
> would be nice to be able to open read the small piece of it that is needed
> for your operation.
> The first is easy to handle.  I know the start and end of creationTime,
> and I can shard it myself.  The second requires me to consult the datastore
> to find out what the smallest creationTime is in the datastore, and then
> use it as a[n] (advisory  not hard,) lower limit; the query splitter should
> work well whether the oldest records were four years old or barely more
> than two years old.   For this to be possible, I need access to the
> Datastore object, and this Datastore object needs to be passed as some sort
> of user callback.  The QuerySplitter hook already existed and seemed to fit
> my needs perfectly.
> Is there a better alternative that still gives me access to the Datastore?
> On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath <>
> wrote:
>> Thanks. IMHO it might be better to perform this splitting as a part of
>> your pipeline instead of making source splitting customizable. The reason
>> is, it's easy for users to shoot themselves on the foot if we allow
>> specifying a custom splitter. A bug in a custom QuerySplitter can result in
>> a hard to catch data loss or data duplication bug. So I'd rather not make
>> it a part of the user API.
>> I can think of two ways for performing this splitting as a part of your
>> pipeline.
>> (1) Split the query during job construction and create a source per
>> query. This can be followed by a Flatten transform that creates a single
>> PCollection. (Once caveat is, you might run into 10MB request size limit if
>> you create two many splits here. So try reducing the number of splits if
>> you ran into this).
>> (2) Add a ReadAll transform to DatastoreIO. This will allow you to
>> precede the step that performs reading by a ParDo step that splits your
>> query and create a PCollection of queries. You should not run into size
>> limits here since splitting happens in the data plane.
>> Thanks,
>> Cham
>> On Wed, May 2, 2018 at 12:50 PM Frank Yellin <> wrote:
>>> TLDR:
>>> Is it okay for me to expose Datastore in apache beam's DatastoreIO, and
>>> thus indirectly expose
>>> Is there a better solution?
>>> As I explain in Beam 4186
>>> <>, I would like to be
>>> able to extend DatastoreV1.Read to have a
>>>        withQuerySplitter(QuerrySplitter querySplitter)
>>> method, which would use an alternative query splitter.   The standard
>>> one shards by key and is very limited.
>>> I have already written such a query splitter.  In fact, the query
>>> splitter I've written goes further than specified in the beam, and reads
>>> the minimum or maximum value of the field from the datastore if no minimum
>>> or maximum is specified in the query, and uses that value for the
>>> sharding.   I can write:
>>>        SELECT * FROM ledger where type = 'purchase'
>>> and then ask it to shard on the eventTime, and it will shard nicely!  I
>>> am working with the Datastore folks to separately add my new query splitter
>>> as an option in DatastoreHelper.
>>> I have already written the code to add withQuerySplitter.
>>> However the problem is that I am increasing the "surface API" of
>>> Dataflow.
>>>        QuerySplitter exposes Datastore  exposes DatastoreException
>>> exposes
>>> and is not (yet) part of the API surface.
>>> As a solution, I've added package to the list of classes
>>> exposed.  This package contains protobuf enums.  Is this okay?  Is there a
>>> better solution?
>>> Thanks.

View raw message