beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Yellin>
Subject Re: I want to allow a user-specified QuerySplitter for DatastoreIO
Date Thu, 03 May 2018 15:28:03 GMT
I actually tried (1), and ran precisely into the size limit that you
mentioned.  Because of the size of the database, I needed to split it into
a few hundred shards, and that was more than the request limit.

I was also considering a slightly different alternative to (2), such as
adding setQueries(), or setSplitterPTransform().  The semantics would be
identical to that of your ReadAll, but I'd be able to reuse more of the
code that is there.  This gave me interesting results, but it wasn't as
powerful as what I needed.  See (2) below.

The two specific use cases that were motivating me were that I needed to
write code that could
      (1) delete a property from all Entitys whose creationTime is between
one month and two months ago..
      (2) delete all Entitys whose creationTime is more than two years ago.
I think these are common-enough operations.  For a very large database, it
would be nice to be able to open read the small piece of it that is needed
for your operation.

The first is easy to handle.  I know the start and end of creationTime, and
I can shard it myself.  The second requires me to consult the datastore to
find out what the smallest creationTime is in the datastore, and then use
it as a[n] (advisory  not hard,) lower limit; the query splitter should
work well whether the oldest records were four years old or barely more
than two years old.   For this to be possible, I need access to the
Datastore object, and this Datastore object needs to be passed as some sort
of user callback.  The QuerySplitter hook already existed and seemed to fit
my needs perfectly.

Is there a better alternative that still gives me access to the Datastore?

On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath <>

> Thanks. IMHO it might be better to perform this splitting as a part of
> your pipeline instead of making source splitting customizable. The reason
> is, it's easy for users to shoot themselves on the foot if we allow
> specifying a custom splitter. A bug in a custom QuerySplitter can result in
> a hard to catch data loss or data duplication bug. So I'd rather not make
> it a part of the user API.
> I can think of two ways for performing this splitting as a part of your
> pipeline.
> (1) Split the query during job construction and create a source per query.
> This can be followed by a Flatten transform that creates a single
> PCollection. (Once caveat is, you might run into 10MB request size limit if
> you create two many splits here. So try reducing the number of splits if
> you ran into this).
> (2) Add a ReadAll transform to DatastoreIO. This will allow you to precede
> the step that performs reading by a ParDo step that splits your query and
> create a PCollection of queries. You should not run into size limits here
> since splitting happens in the data plane.
> Thanks,
> Cham
> On Wed, May 2, 2018 at 12:50 PM Frank Yellin <> wrote:
>> TLDR:
>> Is it okay for me to expose Datastore in apache beam's DatastoreIO, and
>> thus indirectly expose
>> Is there a better solution?
>> As I explain in Beam 4186
>> <>, I would like to be
>> able to extend DatastoreV1.Read to have a
>>        withQuerySplitter(QuerrySplitter querySplitter)
>> method, which would use an alternative query splitter.   The standard one
>> shards by key and is very limited.
>> I have already written such a query splitter.  In fact, the query
>> splitter I've written goes further than specified in the beam, and reads
>> the minimum or maximum value of the field from the datastore if no minimum
>> or maximum is specified in the query, and uses that value for the
>> sharding.   I can write:
>>        SELECT * FROM ledger where type = 'purchase'
>> and then ask it to shard on the eventTime, and it will shard nicely!  I
>> am working with the Datastore folks to separately add my new query splitter
>> as an option in DatastoreHelper.
>> I have already written the code to add withQuerySplitter.
>> However the problem is that I am increasing the "surface API" of
>> Dataflow.
>>        QuerySplitter exposes Datastore  exposes DatastoreException
>> exposes
>> and is not (yet) part of the API surface.
>> As a solution, I've added package to the list of classes
>> exposed.  This package contains protobuf enums.  Is this okay?  Is there a
>> better solution?
>> Thanks.

View raw message