spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <>
Subject Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2
Date Wed, 06 Sep 2017 23:55:55 GMT
I'm all for keeping this moving and not getting too far into the details
(like naming), but I think the substantial details should be clarified
first since they are in the proposal that's being voted on.

I would prefer moving the write side to a separate SPIP, too, since there
isn't much detail in the proposal and I think we should be more deliberate
with things like schema evolution.

On Thu, Aug 31, 2017 at 10:33 AM, Wenchen Fan <> wrote:

> Hi Ryan,
> I think for a SPIP, we should not worry too much about details, as we can
> discuss them during PR review after the vote pass.
> I think we should focus more on the overall design, like James did. The
> interface mix-in vs plan push down discussion was great, hope we can get a
> consensus on this topic soon. The current proposal is, we keep the
> interface mix-in framework, and add an unstable plan push down trait.
> For details like interface names, sort push down vs sort propagate, etc.,
> I think they should not block the vote, as they can be updated/improved
> within the current interface mix-in framework.
> About separating read/write proposals, we should definitely send
> individual PRs for read/write when developing data source v2. I'm also OK
> with voting on the read side first. The write side is way simpler than the
> read side, I think it's more important to get agreement on the read side
> first.
> BTW, I do appreciate your feedbacks/comments on the prototype, let's keep
> the discussion there. In the meanwhile, let's have more discussion on the
> overall framework, and drive this project together.
> Wenchen
> On Thu, Aug 31, 2017 at 6:22 AM, Ryan Blue <> wrote:
>> Maybe I'm missing something, but the high-level proposal consists of:
>> Goals, Non-Goals, and Proposed API. What is there to discuss other than the
>> details of the API that's being proposed? I think the goals make sense, but
>> goals alone aren't enough to approve a SPIP.
>> On Wed, Aug 30, 2017 at 2:46 PM, Reynold Xin <> wrote:
>>> So we seem to be getting into a cycle of discussing more about the
>>> details of APIs than the high level proposal. The details of APIs are
>>> important to debate, but those belong more in code reviews.
>>> One other important thing is that we should avoid API design by
>>> committee. While it is extremely useful to get feedback, understand the use
>>> cases, we cannot do API design by incorporating verbatim the union of
>>> everybody's feedback. API design is largely a tradeoff game. The most
>>> expressive API would also be harder to use, or sacrifice backward/forward
>>> compatibility. It is as important to decide what to exclude as what to
>>> include.
>>> Unlike the v1 API, the way Wenchen's high level V2 framework is proposed
>>> makes it very easy to add new features (e.g. clustering properties) in the
>>> future without breaking any APIs. I'd rather us shipping something useful
>>> that might not be the most comprehensive set, than debating about every
>>> single feature we should add and then creating something super complicated
>>> that has unclear value.
>>> On Wed, Aug 30, 2017 at 6:37 PM, Ryan Blue <> wrote:
>>>> -1 (non-binding)
>>>> Sometimes it takes a VOTE thread to get people to actually read and
>>>> comment, so thanks for starting this one… but there’s still discussion
>>>> happening on the prototype API, which it hasn’t been updated. I’d like
>>>> see the proposal shaped by the ongoing discussion so that we have a better,
>>>> more concrete plan. I think that’s going to produces a better SPIP.
>>>> The second reason for -1 is that I think the read- and write-side
>>>> proposals should be separated. The PR
>>>> <> currently has “write
>>>> path” listed as a TODO item and most of the discussion I’ve seen is on
>>>> read side. I think it would be better to separate the read and write APIs
>>>> so we can focus on them individually.
>>>> An example of why we should focus on the write path separately is that
>>>> the proposal says this:
>>>> Ideally partitioning/bucketing concept should not be exposed in the
>>>> Data Source API V2, because they are just techniques for data skipping and
>>>> pre-partitioning. However, these 2 concepts are already widely used in
>>>> Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION.
>>>> To be consistent, we need to add partitioning/bucketing to Data Source V2
>>>> . .
>>>> Essentially, the some APIs mix DDL and DML operations. I’d like to
>>>> consider ways to fix that problem instead of carrying the problem forward
>>>> to Data Source V2. We can solve this by adding a high-level API for DDL and
>>>> a better write/insert API that works well with it. Clearly, that discussion
>>>> is independent of the read path, which is why I think separating the two
>>>> proposals would be a win.
>>>> rb
>>>> ​
>>>> On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin <>
>>>> wrote:
>>>>> That might be good to do, but seems like orthogonal to this effort
>>>>> itself. It would be a completely different interface.
>>>>> On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan <>
>>>>> wrote:
>>>>>> OK I agree with it, how about we add a new interface to push down
>>>>>> query plan, based on the current framework? We can mark the
>>>>>> query-plan-push-down interface as unstable, to save the effort of
>>>>>> a stable representation of query plan and maintaining forward compatibility.
>>>>>> On Wed, Aug 30, 2017 at 10:53 AM, James Baker <>
>>>>>> wrote:
>>>>>>> I'll just focus on the one-by-one thing for now - it's the thing
>>>>>>> that blocks me the most.
>>>>>>> I think the place where we're most confused here is on the cost
>>>>>>> determining whether I can push down a filter. For me, in order
to work out
>>>>>>> whether I can push down a filter or satisfy a sort, I might have
to read
>>>>>>> plenty of data. That said, it's worth me doing this because I
can use this
>>>>>>> information to avoid reading >>that much data.
>>>>>>> If you give me all the orderings, I will have to read that data
>>>>>>> times (we stream it to avoid keeping it in memory).
>>>>>>> There's also a thing where our typical use cases have many filters
>>>>>>> (20+ is common). So, it's likely not going to work to pass us
all the
>>>>>>> combinations. That said, if I can tell you a cost, I know what
>>>>>>> looks like, why can't I just pick that myself?
>>>>>>> The current design is friendly to simple datasources, but does
>>>>>>> have the potential to support this.
>>>>>>> So the main problem we have with datasources v1 is that it's
>>>>>>> essentially impossible to leverage a bunch of Spark features
- I don't get
>>>>>>> to use bucketing or row batches or all the nice things that I
really want
>>>>>>> to use to get decent performance. Provided I can leverage these
in a
>>>>>>> moderately supported way which won't break in any given commit,
I'll be
>>>>>>> pretty happy with anything that lets me opt out of the restrictions.
>>>>>>> My suggestion here is that if you make a mode which works well
>>>>>>> complicated use cases, you end up being able to write simple
mode in terms
>>>>>>> of it very easily. So we could actually provide two APIs, one
that lets
>>>>>>> people who have more interesting datasources leverage the cool
>>>>>>> features, and one that lets people who just want to implement
>>>>>>> features do that - I'd try to include some kind of layering here.
I could
>>>>>>> probably sketch out something here if that'd be useful?
>>>>>>> James
>>>>>>> On Tue, 29 Aug 2017 at 18:59 Wenchen Fan <>
>>>>>>> wrote:
>>>>>>>> Hi James,
>>>>>>>> Thanks for your feedback! I think your concerns are all valid,
>>>>>>>> we need to make a tradeoff here.
>>>>>>>> > Explicitly here, what I'm looking for is a convenient
>>>>>>>> to accept a fully specified set of arguments
>>>>>>>> The problem with this approach is: 1) if we wanna add more
>>>>>>>> arguments in the future, it's really hard to do without changing
>>>>>>>> the existing interface. 2) if a user wants to implement a
very simple data
>>>>>>>> source, he has to look at all the arguments and understand
them, which may
>>>>>>>> be a burden for him.
>>>>>>>> I don't have a solution to solve these 2 problems, comments
>>>>>>>> welcome.
>>>>>>>> > There are loads of cases like this - you can imagine
>>>>>>>> being able to push down a sort before a filter is applied,
but not
>>>>>>>> afterwards. However, maybe the filter is so selective that
it's better to
>>>>>>>> push down the filter and not handle the sort. I don't get
to make this
>>>>>>>> decision, Spark does (but doesn't have good enough information
to do it
>>>>>>>> properly, whilst I do). I want to be able to choose the parts
I push down
>>>>>>>> given knowledge of my datasource - as defined the APIs don't
let me do
>>>>>>>> that, they're strictly more restrictive than the V1 APIs
in this way.
>>>>>>>> This is true, the current framework applies push downs one
by one,
>>>>>>>> incrementally. If a data source wanna go back to accept a
sort push down
>>>>>>>> after it accepts a filter push down, it's impossible with
the current data
>>>>>>>> source V2.
>>>>>>>> Fortunately, we have a solution for this problem. At Spark
>>>>>>>> actually we do have a fully specified set of arguments waiting
>>>>>>>> be pushed down, but Spark doesn't know which is the best
order to push them
>>>>>>>> into data source. Spark can try every combination and ask
the data source
>>>>>>>> to report a cost, then Spark can pick the best combination
with the lowest
>>>>>>>> cost. This can also be implemented as a cost report interface,
so that
>>>>>>>> advanced data source can implement it for optimal performance,
and simple
>>>>>>>> data source doesn't need to care about it and keep simple.
>>>>>>>> The current design is very friendly to simple data source,
and has
>>>>>>>> the potential to support complex data source, I prefer the
current design
>>>>>>>> over the plan push down one. What do you think?
>>>>>>>> On Wed, Aug 30, 2017 at 5:53 AM, James Baker <>
>>>>>>>> wrote:
>>>>>>>>> Yeah, for sure.
>>>>>>>>> With the stable representation - agree that in the general
>>>>>>>>> this is pretty intractable, it restricts the modifications
that you can do
>>>>>>>>> in the future too much. That said, it shouldn't be as
hard if you restrict
>>>>>>>>> yourself to the parts of the plan which are supported
by the datasources V2
>>>>>>>>> API (which after all, need to be translateable properly
into the future to
>>>>>>>>> support the mixins proposed). This should have a pretty
small scope in
>>>>>>>>> comparison. As long as the user can bail out of nodes
they don't
>>>>>>>>> understand, they should be ok, right?
>>>>>>>>> That said, what would also be fine for us is a place
to plug into
>>>>>>>>> an unstable query plan.
>>>>>>>>> Explicitly here, what I'm looking for is a convenient
mechanism to
>>>>>>>>> accept a fully specified set of arguments (of which I
can choose to ignore
>>>>>>>>> some), and return the information as to which of them
I'm ignoring. Taking
>>>>>>>>> a query plan of sorts is a way of doing this which IMO
is intuitive to the
>>>>>>>>> user. It also provides a convenient location to plug
in things like stats.
>>>>>>>>> Not at all married to the idea of using a query plan
here; it just seemed
>>>>>>>>> convenient.
>>>>>>>>> Regarding the users who just want to be able to pump
data into
>>>>>>>>> Spark, my understanding is that replacing isolated nodes
in a query plan is
>>>>>>>>> easy. That said, our goal here is to be able to push
down as much as
>>>>>>>>> possible into the underlying datastore.
>>>>>>>>> To your second question:
>>>>>>>>> The issue is that if you build up pushdowns incrementally
and not
>>>>>>>>> all at once, you end up having to reject pushdowns and
filters that you
>>>>>>>>> actually can do, which unnecessarily increases overheads.
>>>>>>>>> For example, the dataset
>>>>>>>>> a b c
>>>>>>>>> 1 2 3
>>>>>>>>> 1 3 3
>>>>>>>>> 1 3 4
>>>>>>>>> 2 1 1
>>>>>>>>> 2 0 1
>>>>>>>>> can efficiently push down sort(b, c) if I have already
applied the
>>>>>>>>> filter a = 1, but otherwise will force a sort in Spark.
On the PR I detail
>>>>>>>>> a case I see where I can push down two equality filters
iff I am given them
>>>>>>>>> at the same time, whilst not being able to one at a time.
>>>>>>>>> There are loads of cases like this - you can imagine
someone being
>>>>>>>>> able to push down a sort before a filter is applied,
but not afterwards.
>>>>>>>>> However, maybe the filter is so selective that it's better
to push down the
>>>>>>>>> filter and not handle the sort. I don't get to make this
decision, Spark
>>>>>>>>> does (but doesn't have good enough information to do
it properly, whilst I
>>>>>>>>> do). I want to be able to choose the parts I push down
given knowledge of
>>>>>>>>> my datasource - as defined the APIs don't let me do that,
they're strictly
>>>>>>>>> more restrictive than the V1 APIs in this way.
>>>>>>>>> The pattern of not considering things that can be done
in bulk
>>>>>>>>> bites us in other ways. The retrieval methods end up
being trickier to
>>>>>>>>> implement than is necessary because frequently a single
operation provides
>>>>>>>>> the result of many of the getters, but the state is mutable,
so you end up
>>>>>>>>> with odd caches.
>>>>>>>>> For example, the work I need to do to answer unhandledFilters
>>>>>>>>> V1 is roughly the same as the work I need to do to buildScan,
so I want to
>>>>>>>>> cache it. This means that I end up with code that looks
>>>>>>>>> public final class CachingFoo implements Foo {
>>>>>>>>>     private final Foo delegate;
>>>>>>>>>     private List<Filter> currentFilters = emptyList();
>>>>>>>>>     private Supplier<Bar> barSupplier =
>>>>>>>>> newSupplier(currentFilters);
>>>>>>>>>     public CachingFoo(Foo delegate) {
>>>>>>>>>         this.delegate = delegate;
>>>>>>>>>     }
>>>>>>>>>     private Supplier<Bar> newSupplier(List<Filter>
filters) {
>>>>>>>>>         return Suppliers.memoize(() ->
>>>>>>>>> delegate.computeBar(filters));
>>>>>>>>>     }
>>>>>>>>>     @Override
>>>>>>>>>     public Bar computeBar(List<Filter> filters)
>>>>>>>>>         if (!filters.equals(currentFilters)) {
>>>>>>>>>             currentFilters = filters;
>>>>>>>>>             barSupplier = newSupplier(filters);
>>>>>>>>>         }
>>>>>>>>>         return barSupplier.get();
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>> which caches the result required in unhandledFilters
on the
>>>>>>>>> expectation that Spark will call buildScan afterwards
and get to use the
>>>>>>>>> result..
>>>>>>>>> This kind of cache becomes more prominent, but harder
to deal with
>>>>>>>>> in the new APIs. As one example here, the state I will
need in order to
>>>>>>>>> compute accurate column stats internally will likely
be a subset of the
>>>>>>>>> work required in order to get the read tasks, tell you
if I can handle
>>>>>>>>> filters, etc, so I'll want to cache them for reuse. However,
the cached
>>>>>>>>> information needs to be appropriately invalidated when
I add a new filter
>>>>>>>>> or sort order or limit, and this makes implementing the
APIs harder and
>>>>>>>>> more error-prone.
>>>>>>>>> One thing that'd be great is a defined contract of the
order in
>>>>>>>>> which Spark calls the methods on your datasource (ideally
this contract
>>>>>>>>> could be implied by the way the Java class structure
works, but otherwise I
>>>>>>>>> can just throw).
>>>>>>>>> James
>>>>>>>>> On Tue, 29 Aug 2017 at 02:56 Reynold Xin <>
>>>>>>>>> wrote:
>>>>>>>>>> James,
>>>>>>>>>> Thanks for the comment. I think you just pointed
out a trade-off
>>>>>>>>>> between expressiveness and API simplicity, compatibility
and evolvability.
>>>>>>>>>> For the max expressiveness, we'd want the ability
to expose full query
>>>>>>>>>> plans, and let the data source decide which part
of the query plan can be
>>>>>>>>>> pushed down.
>>>>>>>>>> The downside to that (full query plan push down)
>>>>>>>>>> 1. It is extremely difficult to design a stable representation
>>>>>>>>>> for logical / physical plan. It is doable, but we'd
be the first to do
>>>>>>>>>> it. I'm not sure of any mainstream databases being
able to do that in the
>>>>>>>>>> past. The design of that API itself, to make sure
we have a good story for
>>>>>>>>>> backward and forward compatibility, would probably
take months if not
>>>>>>>>>> years. It might still be good to do, or offer an
experimental trait without
>>>>>>>>>> compatibility guarantee that uses the current Catalyst
internal logical
>>>>>>>>>> plan.
>>>>>>>>>> 2. Most data source developers simply want a way
to offer some
>>>>>>>>>> data, without any pushdown. Having to understand
query plans is a burden
>>>>>>>>>> rather than a gift.
>>>>>>>>>> Re: your point about the proposed v2 being worse
than v1 for your
>>>>>>>>>> use case.
>>>>>>>>>> Can you say more? You used the argument that in v2
there are more
>>>>>>>>>> support for broader pushdown and as a result it is
harder to implement.
>>>>>>>>>> That's how it is supposed to be. If a data source
simply implements one of
>>>>>>>>>> the trait, it'd be logically identical to v1. I don't
see why it would be
>>>>>>>>>> worse or better, other than v2 provides much stronger
forward compatibility
>>>>>>>>>> guarantees than v1.
>>>>>>>>>> On Tue, Aug 29, 2017 at 4:54 AM, James Baker <
>>>>>>>>>> > wrote:
>>>>>>>>>>> Copying from the code review comments I just
submitted on the
>>>>>>>>>>> draft API (
>>>>>>>>>>> spark/pull/10#pullrequestreview-59088745):
>>>>>>>>>>> Context here is that I've spent some time implementing
a Spark
>>>>>>>>>>> datasource and have had some issues with the
current API which are made
>>>>>>>>>>> worse in V2.
>>>>>>>>>>> The general conclusion I’ve come to here is
that this is very
>>>>>>>>>>> hard to actually implement (in a similar but
more aggressive way than
>>>>>>>>>>> DataSource V1, because of the extra methods and
dimensions we get in V2).
>>>>>>>>>>> In DataSources V1 PrunedFilteredScan, the issue
is that you are
>>>>>>>>>>> passed in the filters with the buildScan method,
and then passed in again
>>>>>>>>>>> with the unhandledFilters method.
>>>>>>>>>>> However, the filters that you can’t handle
might be data
>>>>>>>>>>> dependent, which the current API does not handle
well. Suppose I can handle
>>>>>>>>>>> filter A some of the time, and filter B some
of the time. If I’m passed in
>>>>>>>>>>> both, then either A and B are unhandled, or A,
or B, or neither. The work I
>>>>>>>>>>> have to do to work this out is essentially the
same as I have to do while
>>>>>>>>>>> actually generating my RDD (essentially I have
to generate my partitions),
>>>>>>>>>>> so I end up doing some weird caching work.
>>>>>>>>>>> This V2 API proposal has the same issues, but
perhaps moreso. In
>>>>>>>>>>> PrunedFilteredScan, there is essentially one
degree of freedom for pruning
>>>>>>>>>>> (filters), so you just have to implement caching
between unhandledFilters
>>>>>>>>>>> and buildScan. However, here we have many degrees
of freedom; sorts,
>>>>>>>>>>> individual filters, clustering, sampling, maybe
aggregations eventually -
>>>>>>>>>>> and these operations are not all commutative,
and computing my support
>>>>>>>>>>> one-by-one can easily end up being more expensive
than computing all in one
>>>>>>>>>>> go.
>>>>>>>>>>> For some trivial examples:
>>>>>>>>>>> - After filtering, I might be sorted, whilst
before filtering I
>>>>>>>>>>> might not be.
>>>>>>>>>>> - Filtering with certain filters might affect
my ability to push
>>>>>>>>>>> down others.
>>>>>>>>>>> - Filtering with aggregations (as mooted) might
not be possible
>>>>>>>>>>> to push down.
>>>>>>>>>>> And with the API as currently mooted, I need
to be able to go
>>>>>>>>>>> back and change my results because they might
change later.
>>>>>>>>>>> Really what would be good here is to pass all
of the filters and
>>>>>>>>>>> sorts etc all at once, and then I return the
parts I can’t handle.
>>>>>>>>>>> I’d prefer in general that this be implemented
by passing some
>>>>>>>>>>> kind of query plan to the datasource which enables
this kind of
>>>>>>>>>>> replacement. Explicitly don’t want to give
the whole query plan - that
>>>>>>>>>>> sounds painful - would prefer we push down only
the parts of the query plan
>>>>>>>>>>> we deem to be stable. With the mix-in approach,
I don’t think we can
>>>>>>>>>>> guarantee the properties we want without a two-phase
thing - I’d really
>>>>>>>>>>> love to be able to just define a straightforward
union type which is our
>>>>>>>>>>> supported pushdown stuff, and then the user can
transform and return it.
>>>>>>>>>>> I think this ends up being a more elegant API
for consumers, and
>>>>>>>>>>> also far more intuitive.
>>>>>>>>>>> James
>>>>>>>>>>> On Mon, 28 Aug 2017 at 18:00 蒋星博 <>
>>>>>>>>>>>> +1 (Non-binding)
>>>>>>>>>>>> Xiao Li <>于2017年8月28日
>>>>>>>>>>>>> +1
>>>>>>>>>>>>> 2017-08-28 12:45 GMT-07:00 Cody Koeninger
>>>>>>>>>>>>> :
>>>>>>>>>>>>>> Just wanted to point out that because
the jira isn't labeled
>>>>>>>>>>>>>> SPIP, it
>>>>>>>>>>>>>> won't have shown up linked from
>>>>>>>>>>>>>> On Mon, Aug 28, 2017 at 2:20 PM,
Wenchen Fan <
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> > Hi all,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > It has been almost 2 weeks since
I proposed the data source
>>>>>>>>>>>>>> V2 for
>>>>>>>>>>>>>> > discussion, and we already got
some feedbacks on the JIRA
>>>>>>>>>>>>>> ticket and the
>>>>>>>>>>>>>> > prototype PR, so I'd like to
call for a vote.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > The full document of the Data
Source API V2 is:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> nt/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Note that, this vote should
focus on high-level
>>>>>>>>>>>>>> design/framework, not
>>>>>>>>>>>>>> > specified APIs, as we can always
change/improve specified
>>>>>>>>>>>>>> APIs during
>>>>>>>>>>>>>> > development.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > The vote will be up for the
next 72 hours. Please reply
>>>>>>>>>>>>>> with your vote:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > +1: Yeah, let's go forward and
implement the SPIP.
>>>>>>>>>>>>>> > +0: Don't really care.
>>>>>>>>>>>>>> > -1: I don't think this is a
good idea because of the
>>>>>>>>>>>>>> following technical
>>>>>>>>>>>>>> > reasons.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thanks!
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> ---------
>>>>>>>>>>>>>> To unsubscribe e-mail:
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix

Ryan Blue
Software Engineer

View raw message