I actually tried (1), and ran precisely into the size limit that you mentioned.  Because of the size of the database, I needed to split it into a few hundred shards, and that was more than the request limit.

I was also considering a slightly different alternative to (2), such as  adding setQueries(), or setSplitterPTransform().  The semantics would be identical to that of your ReadAll, but I'd be able to reuse more of the code that is there.  This gave me interesting results, but it wasn't as powerful as what I needed.  See (2) below.  

The two specific use cases that were motivating me were that I needed to write code that could
      (1) delete a property from all Entitys whose creationTime is between one month and two months ago..
      (2) delete all Entitys whose creationTime is more than two years ago.
I think these are common-enough operations.  For a very large database, it would be nice to be able to open read the small piece of it that is needed for your operation.

The first is easy to handle.  I know the start and end of creationTime, and I can shard it myself.  The second requires me to consult the datastore to find out what the smallest creationTime is in the datastore, and then use it as a[n] (advisory  not hard,) lower limit; the query splitter should work well whether the oldest records were four years old or barely more than two years old.   For this to be possible, I need access to the Datastore object, and this Datastore object needs to be passed as some sort of user callback.  The QuerySplitter hook already existed and seemed to fit my needs perfectly.

Is there a better alternative that still gives me access to the Datastore?

On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath <chamikara@google.com> wrote:
Thanks. IMHO it might be better to perform this splitting as a part of your pipeline instead of making source splitting customizable. The reason is, it's easy for users to shoot themselves on the foot if we allow specifying a custom splitter. A bug in a custom QuerySplitter can result in a hard to catch data loss or data duplication bug. So I'd rather not make it a part of the user API.

I can think of two ways for performing this splitting as a part of your pipeline.
(1) Split the query during job construction and create a source per query. This can be followed by a Flatten transform that creates a single PCollection. (Once caveat is, you might run into 10MB request size limit if you create two many splits here. So try reducing the number of splits if you ran into this).
(2) Add a ReadAll transform to DatastoreIO. This will allow you to precede the step that performs reading by a ParDo step that splits your query and create a PCollection of queries. You should not run into size limits here since splitting happens in the data plane.


On Wed, May 2, 2018 at 12:50 PM Frank Yellin <fy@fyellin.com> wrote:
Is it okay for me to expose Datastore in apache beam's DatastoreIO, and thus indirectly expose com.google.rpc.Code?
Is there a better solution?

As I explain in Beam 4186, I would like to be able to extend DatastoreV1.Read to have a
       withQuerySplitter(QuerrySplitter querySplitter)
method, which would use an alternative query splitter.   The standard one shards by key and is very limited.

I have already written such a query splitter.  In fact, the query splitter I've written goes further than specified in the beam, and reads the minimum or maximum value of the field from the datastore if no minimum or maximum is specified in the query, and uses that value for the sharding.   I can write:
       SELECT * FROM ledger where type = 'purchase'
and then ask it to shard on the eventTime, and it will shard nicely!  I am working with the Datastore folks to separately add my new query splitter as an option in DatastoreHelper

I have already written the code to add withQuerySplitter.  

However the problem is that I am increasing the "surface API" of Dataflow.  
       QuerySplitter exposes Datastore  exposes DatastoreException  exposes com.google.rpc.Code
and com.google.rpc.Code is not (yet) part of the API surface.

As a solution, I've added package com.google.rpc to the list of classes exposed.  This package contains protobuf enums.  Is this okay?  Is there a better solution?