spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <rb...@netflix.com.INVALID>
Subject Re: [discuss] Data Source V2 write path
Date Wed, 27 Sep 2017 19:40:05 GMT
Comments inline. I've written up what I'm proposing with a bit more detail.

On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan <cloud0fan@gmail.com> wrote:

> I'm trying to give a summary:
>
> Ideally data source API should only deal with data, not metadata. But one
> key problem is, Spark still need to support data sources without metastore,
> e.g. file format data sources.
>
> For this kind of data sources, users have to pass the metadata information
> like partitioning/bucketing to every write action of a "table"(or other
> identifiers like path of a file format data source), and it's user's
> responsibility to make sure these metadata information are consistent. If
> it's inconsistent, the behavior is undefined, different data sources may
> have different behaviors.
>

Agreed so far. One minor point is that we currently throws an exception if
you try to configure, for example, partitioning and also use `insertInto`.


> If we agree on this, then data source write API should have a way to pass
> these metadata information, and I think using data source options is a good
> choice because it's the most implicit way and doesn't require new APIs.
>

What I don't understand is why we "can't avoid this problem" unless you
mean the last point, that we have to support this. I don't think that using
data source options is a good choice, but maybe I don't understand the
alternatives. Here's a straw-man version of what I'm proposing so you can
tell me what's wrong with it or why options are a better choice.

I'm assuming we start with a query like this:
```
df.write.partitionBy("utc_date").bucketBy("primary_key").format("parquet").
saveAsTable("s3://bucket/path/")
```

That creates a logical node, `CreateTableAsSelect`, with some options. It
would contain a `Relation` (or `CatalogTable` definition?) that corresponds
to the user's table name and `partitionBy`, `format`, etc. calls. It should
also have a write mode and the logical plan for `df`.

When this CTAS logical node is turned into a physical plan, the relation
gets turned into a `DataSourceV2` instance and then Spark gets a writer and
configures it with the proposed API. The main point of this is to pass the
logical relation (with all of the user's options) through to the data
source, not the writer. The data source creates the writer and can tell the
writer what to do. Another benefit of this approach is that the relation
gets resolved during analysis, when it is easy to add sorts and other
requirements to the logical plan.

If we were to implement what I'm suggesting, then we could handle metadata
conflicts outside of the `DataSourceV2Writer`, in the data source. That
eliminates problems about defining behavior when there are conflicts (the
next point) and prepares implementations for a catalog API that would
standardize how those conflicts are handled. In the short term, this
doesn't have to be in a public API yet. It can be special handling for
HadoopFS relations that we can eventually use underneath a public API.

Please let me know if I've misunderstood something. Now that I've written
out how we could actually implement conflict handling outside of the
writer, I can see that it isn't as obvious of a change as I thought. But, I
think in the long term this would be a better way to go.


> But then we have another problem: how to define the behavior for data
> sources with metastore when the given options contain metadata information?
> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
> partition columns, he doesn't know what will happen. The table may not
> exist and he may create the table successfully with specified partition
> columns, or the table already exist but has inconsistent partition columns
> and Spark throws exception. Besides, save mode doesn't play well in this
> case, as we may need different save modes for data and metadata.
>
> My proposal: data source API should only focus on data, but concrete data
> sources can implement some dirty features via options. e.g. file format
> data sources can take partitioning/bucketing from options, data source with
> metastore can use a special flag in options to indicate a create table
> command(without writing data).
>

I can see how this would make changes smaller, but I don't think it is a
good thing to do. If we do this, then I think we will not really accomplish
what we want to with this (a clean write API).


> In other words, Spark connects users to data sources with a clean protocol
> that only focus on data, but this protocol has a backdoor: the data source
> options. Concrete data sources are free to define how to deal with
> metadata, e.g. Cassandra data source can ask users to create table at
> Cassandra side first, then write data at Spark side, or ask users to
> provide more details in options and do CTAS at Spark side. These can be
> done via options.
>
> After catalog federation, hopefully only file format data sources still
> use this backdoor.
>

Why would file format sources use a back door after catalog federation??

rb

-- 
Ryan Blue
Software Engineer
Netflix

Mime
View raw message