spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <rb...@netflix.com.INVALID>
Subject Re: [discuss] Data Source V2 write path
Date Mon, 02 Oct 2017 17:42:05 GMT
As far as changes to the public API go, I’d prefer deprecating the API that
mixes data and metadata operations. But I don’t think that requires that we
go with your proposal #1, where the current write API can’t use data source
v2 writers. I think we can separate the metadata operations for Hadoop FS
tables from writes using the current public API. If this isn’t possible for
Hadoop FS sources, I’d like to understand *why* it isn’t possible (*how*
would it require a catalog API?). This also requires special handling for
Hadoop FS sources, it could be that this plan would cause some existing v1
sources, to lack certain operations before the catalog API is ready. We
should identify what those implementations are and then see whether this
would be worth it.

To sum up what I’m saying, here’s my proposal:

   - *4* — DataFrameWriter.save supports data source v2 writers, which
   assume that tables exist and have some configuration. That configuration is
   passed as a Relation or CatalogTable when the writer is created. For
   Hadoop FS tables, we would move metadata operations out of the v2 writer
   and would implement them in a command or using a non-public API called by a
   command. This doesn’t require waiting for a catalog API because it would
   have the same behavior as v1, just *implemented outside the new write
   API*.

This would not require any public API changes, but would prepare for the
catalog API by separating metadata operations from the writers. If writers
are expected to perform some metadata operations, then we won’t be able to
cleanly add the catalog API without changing the expectations of existing
writers.

Eventually, we should deprecate the existing write API so expectations are
clear like they are in SQL. I’ve been thinking it should look like this
(but this is by no means well thought-through):

df.insert.into("db.table") // inserts into db.table by column name
df.insert.byPosition.into("db.table") // insert by
positiondf.save.as("db.table") // creates db.table as select, fails if
exists

I’d also like an API for table creation, which may have more options than
the df.save.as("..."). This would all be after the catalog API is
introduced, of course.

rb
​

On Sun, Oct 1, 2017 at 8:25 AM, Wenchen Fan <cloud0fan@gmail.com> wrote:

> The main entries of data source inside Spark is the SQL API and
> `DataFrameReader/Writer`.
>
> For SQL API, I think the semantic is well defined, the data and metadata
> operations are separated. E.g., INSERT INTO means write data into an
> existing table, CREATE TABLE means only create the metadata. But the
> problem is, data source v1 can't provide metadata access, so we still mix
> data and metadata operations at data source level. Data source v2 can solve
> this problem, by introducing a `CatalogSupport` trait which can allow data
> source to provide metadata access.
>
> For DataFrameWriter API, e.g. `df.write.format(...).save()`, there is no
> separation of data and metadata. According to the document, it seems Spark
> doesn't care about metadata here, and assumes the underlying data sources
> take care of it.
>
> I have 3 proposals:
> 1. `DataFrameWriter.save` doesn't support data source v2, we create new
> APIs(or reusing `DataFrameWriter.insertInto/saveAsTable`?) for data
> source v2 that separate data and metadata clearly.
> 2. Update the semantic of `DataFrameWriter.save` and say it's only for
> writing data, not metadata. Then data source v2 can separate data and
> metadata clearly, because the upper API also seperates them.
> 3. `DataFrameWriter.save` supports data source v2, and concrete data
> source implementations define their own behaviors. They can separate data
> and metadata operations, and fail `DataFrameWriter.save` if the table
> doesn't exist. They can mix the data and metadata operations in
> the write path, and use `options` to carry metadata information.
>
> Proposal 1 and 2 are similar, and proposal 1 is better because changing
> the semantic of an existing API is not good. Both proposal 1 and 2 need to
> wait for catalog federation before releasing data source v2, so that we can
> clearly define what catalog functionalities the data source should provide.
> Both proposal 1 and 2 force data sources to provide metadata
> access(catalog), which is unfriendly to simple data sources that don't have
> metastore.
>
> Personally I prefer proposal 3, because it's not blocked by catalog
> federation, so that we can develop it incrementally. And it makes the
> catalog support optional, so that simple data sources without metastore can
> also implement data source v2.
>
> More proposals are welcome!
>
>
> On Sat, Sep 30, 2017 at 3:26 AM, Ryan Blue <rblue@netflix.com> wrote:
>
>> > Spark doesn't know how to create a table in external systems like
>> Cassandra, and that's why it's currently done inside the data source writer.
>>
>> This isn't a valid argument for doing this task in the writer for v2. If
>> we want to fix the problems with v1, we shouldn't continue to mix write
>> operations with table metadata changes simply because it is more convenient
>> and requires less refactoring.
>>
>> I'm proposing that in v2 we move creation of file system tables outside
>> of the writer, but still in a non-public implementation. Cassandra and
>> other external stores would behave as they should today and assume the
>> table exists, or would wait to use the v2 API until there is catalog
>> support.
>>
>> The important thing is that we don't set a standard that writers can
>> create tables, which is going to lead to different behavior across
>> implementations when we have conflicts between an existing table's config
>> and the options passed into the writer.
>>
>> > For now, Spark just assumes data source writer takes care of it. For
>> the internal file format data source, I propose to pass partition/bucket
>> information to the writer via options, other data sources can define their
>> own behavior, e.g. they can also use the options, or disallow users to
>> write data to a non-existing table and ask users to create the table in the
>> external systems first.
>>
>> The point is preventing data sources from defining their own behavior so
>> we can introduce consistent behavior across sources for v2.
>>
>> rb
>>
>> On Thu, Sep 28, 2017 at 8:49 PM, Wenchen Fan <cloud0fan@gmail.com> wrote:
>>
>>> > When this CTAS logical node is turned into a physical plan, the
>>> relation gets turned into a `DataSourceV2` instance and then Spark gets a
>>> writer and configures it with the proposed API. The main point of this is
>>> to pass the logical relation (with all of the user's options) through to
>>> the data source, not the writer. The data source creates the writer and can
>>> tell the writer what to do.
>>>
>>> Here is the problem: Spark doesn't know how to create a table in
>>> external systems like Cassandra, and that's why it's currently done inside
>>> the data source writer.
>>>
>>> In the future, we can add a new trait `CatalogSupport` for
>>> `DataSourceV2`, so that we can use your proposal and separate metadata
>>> management from data source writer.
>>>
>>> For now, Spark just assumes data source writer takes care of it. For the
>>> internal file format data source, I propose to pass partition/bucket
>>> information to the writer via options, other data sources can define their
>>> own behavior, e.g. they can also use the options, or disallow users to
>>> write data to a non-existing table and ask users to create the table in the
>>> external systems first.
>>>
>>>
>>>
>>> On Thu, Sep 28, 2017 at 5:45 AM, Russell Spitzer <
>>> russell.spitzer@gmail.com> wrote:
>>>
>>>> On an unrelated note, is there any appetite for making the write path
>>>> also include an option to return elements that were not
>>>> able to be processed for some reason.
>>>>
>>>> Usage might be like
>>>>
>>>> saveAndIgnoreFailures() : Dataset
>>>>
>>>> So that if some records cannot be parsed by the datasource for writing,
>>>> or violate some contract with the datasource the records can be returned
>>>> for further processing or dealt with by an alternate system.
>>>>
>>>> On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue <rblue@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Comments inline. I've written up what I'm proposing with a bit more
>>>>> detail.
>>>>>
>>>>> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan <cloud0fan@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm trying to give a summary:
>>>>>>
>>>>>> Ideally data source API should only deal with data, not metadata.
But
>>>>>> one key problem is, Spark still need to support data sources without
>>>>>> metastore, e.g. file format data sources.
>>>>>>
>>>>>> For this kind of data sources, users have to pass the metadata
>>>>>> information like partitioning/bucketing to every write action of
a
>>>>>> "table"(or other identifiers like path of a file format data source),
and
>>>>>> it's user's responsibility to make sure these metadata information
are
>>>>>> consistent. If it's inconsistent, the behavior is undefined, different
data
>>>>>> sources may have different behaviors.
>>>>>>
>>>>>
>>>>> Agreed so far. One minor point is that we currently throws an
>>>>> exception if you try to configure, for example, partitioning and also
use
>>>>> `insertInto`.
>>>>>
>>>>>
>>>>>> If we agree on this, then data source write API should have a way
to
>>>>>> pass these metadata information, and I think using data source options
is
>>>>>> a good choice because it's the most implicit way and doesn't require
new
>>>>>> APIs.
>>>>>>
>>>>>
>>>>> What I don't understand is why we "can't avoid this problem" unless
>>>>> you mean the last point, that we have to support this. I don't think
that
>>>>> using data source options is a good choice, but maybe I don't understand
>>>>> the alternatives. Here's a straw-man version of what I'm proposing so
you
>>>>> can tell me what's wrong with it or why options are a better choice.
>>>>>
>>>>> I'm assuming we start with a query like this:
>>>>> ```
>>>>> df.write.partitionBy("utc_date").bucketBy("primary_key").for
>>>>> mat("parquet").saveAsTable("s3://bucket/path/")
>>>>> ```
>>>>>
>>>>> That creates a logical node, `CreateTableAsSelect`, with some options.
>>>>> It would contain a `Relation` (or `CatalogTable` definition?) that
>>>>> corresponds to the user's table name and `partitionBy`, `format`, etc.
>>>>> calls. It should also have a write mode and the logical plan for `df`.
>>>>>
>>>>> When this CTAS logical node is turned into a physical plan, the
>>>>> relation gets turned into a `DataSourceV2` instance and then Spark gets
a
>>>>> writer and configures it with the proposed API. The main point of this
is
>>>>> to pass the logical relation (with all of the user's options) through
to
>>>>> the data source, not the writer. The data source creates the writer and
can
>>>>> tell the writer what to do. Another benefit of this approach is that
the
>>>>> relation gets resolved during analysis, when it is easy to add sorts
and
>>>>> other requirements to the logical plan.
>>>>>
>>>>> If we were to implement what I'm suggesting, then we could handle
>>>>> metadata conflicts outside of the `DataSourceV2Writer`, in the data source.
>>>>> That eliminates problems about defining behavior when there are conflicts
>>>>> (the next point) and prepares implementations for a catalog API that
would
>>>>> standardize how those conflicts are handled. In the short term, this
>>>>> doesn't have to be in a public API yet. It can be special handling for
>>>>> HadoopFS relations that we can eventually use underneath a public API.
>>>>>
>>>>> Please let me know if I've misunderstood something. Now that I've
>>>>> written out how we could actually implement conflict handling outside
of
>>>>> the writer, I can see that it isn't as obvious of a change as I thought.
>>>>> But, I think in the long term this would be a better way to go.
>>>>>
>>>>>
>>>>>> But then we have another problem: how to define the behavior for
data
>>>>>> sources with metastore when the given options contain metadata information?
>>>>>> A typical case is `DataFrameWriter.saveAsTable`, when a user calls
it with
>>>>>> partition columns, he doesn't know what will happen. The table may
not
>>>>>> exist and he may create the table successfully with specified partition
>>>>>> columns, or the table already exist but has inconsistent partition
columns
>>>>>> and Spark throws exception. Besides, save mode doesn't play well
in this
>>>>>> case, as we may need different save modes for data and metadata.
>>>>>>
>>>>>> My proposal: data source API should only focus on data, but concrete
>>>>>> data sources can implement some dirty features via options. e.g.
file
>>>>>> format data sources can take partitioning/bucketing from options,
data
>>>>>> source with metastore can use a special flag in options to indicate
a
>>>>>> create table command(without writing data).
>>>>>>
>>>>>
>>>>> I can see how this would make changes smaller, but I don't think it is
>>>>> a good thing to do. If we do this, then I think we will not really
>>>>> accomplish what we want to with this (a clean write API).
>>>>>
>>>>>
>>>>>> In other words, Spark connects users to data sources with a clean
>>>>>> protocol that only focus on data, but this protocol has a backdoor:
the
>>>>>> data source options. Concrete data sources are free to define how
to deal
>>>>>> with metadata, e.g. Cassandra data source can ask users to create
table at
>>>>>> Cassandra side first, then write data at Spark side, or ask users
to
>>>>>> provide more details in options and do CTAS at Spark side. These
can be
>>>>>> done via options.
>>>>>>
>>>>>> After catalog federation, hopefully only file format data sources
>>>>>> still use this backdoor.
>>>>>>
>>>>>
>>>>> Why would file format sources use a back door after catalog
>>>>> federation??
>>>>>
>>>>> rb
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Mime
View raw message