spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wenchen Fan <>
Subject Re: [discuss] Data Source V2 write path
Date Mon, 25 Sep 2017 18:26:56 GMT
Catalog federation is to publish the Spark catalog API(kind of a data
source API for metadata), so that Spark is able to read/write metadata from
external systems. (SPARK-15777)

Currently Spark can only read/write Hive metastore, which means for other
systems like Cassandra, we can only implicitly create tables with data
source API.

Again this is not ideal but just a workaround before we finish catalog
federation. That's why the save mode description mostly refer to how data
will be handled instead of metadata.

Because of this, I think we still need to pass metadata like
partitioning/bucketing to the data source write API. And I propose to use
data source options so that it's not at API level and we can easily ignore
these options in the future if catalog federation is done.

The same thing applies to Hadoop FS data sources, we need to pass metadata
to the writer anyway.

On Tue, Sep 26, 2017 at 1:08 AM, Ryan Blue <> wrote:

> However, without catalog federation, Spark doesn’t have an API to ask an
> external system(like Cassandra) to create a table. Currently it’s all done
> by data source write API. Data source implementations are responsible to
> create or insert a table according to the save mode.
> What’s catalog federation? Is there a SPIP for it? It sounds
> straight-forward based on your comments, but I’d rather make sure we’re
> talking about the same thing.
> What I’m proposing doesn’t require a change to either the public API, nor
> does it depend on being able to create tables. Why do writers necessarily
> need to create tables? I think other components (e.g. a federated catalog)
> should manage table creation outside of this abstraction. Just because data
> sources currently create tables doesn’t mean that we are tied to that
> implementation.
> I would also disagree that data source implementations are responsible for
> creating for inserting according to save mode. The modes are “append”,
> “overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
> me that the mode refers to how *data* will be handled, not table
> metadata. Overwrite’s docs
> <>
> state that “existing *data* is expected to be overwritten.”
> Save mode currently introduces confusion because it isn’t clear whether
> the mode applies to tables or to writes. In Hive, overwrite removes
> conflicting partitions, but I think the Hadoop FS relations will delete
> tables. We get around this some by using external tables and preserving
> data, but this is an area where we should have clear semantics for external
> systems like Cassandra. I’d like to see a cleaner public API that separates
> these concerns, but that’s a different discussion. For now, I don’t think
> requiring that a table exists is unreasonable. If a table has no metastore
> (Hadoop FS tables) then we can just pass the table metadata in when
> creating the writer since there is no existence in this case.
> rb
> ​
> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan <> wrote:
>> I agree it would be a clean approach if data source is only responsible
>> to write into an already-configured table. However, without catalog
>> federation, Spark doesn't have an API to ask an external system(like
>> Cassandra) to create a table. Currently it's all done by data source write
>> API. Data source implementations are responsible to create or insert a
>> table according to the save mode.
>> As a workaround, I think it's acceptable to pass partitioning/bucketing
>> information via data source options, and data sources should decide to take
>> these informations and create the table, or throw exception if these
>> informations don't match the already-configured table.
>> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue <> wrote:
>>> > input data requirement
>>> Clustering and sorting within partitions are a good start. We can always
>>> add more later when they are needed.
>>> The primary use case I'm thinking of for this is partitioning and
>>> bucketing. If I'm implementing a partitioned table format, I need to tell
>>> Spark to cluster by my partition columns. Should there also be a way to
>>> pass those columns separately, since they may not be stored in the same way
>>> like partitions are in the current format?
>>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan <>
>>> wrote:
>>>> Hi all,
>>>> I want to have some discussion about Data Source V2 write path before
>>>> starting a voting.
>>>> The Data Source V1 write path asks implementations to write a DataFrame
>>>> directly, which is painful:
>>>> 1. Exposing upper-level API like DataFrame to Data Source API is not
>>>> good for maintenance.
>>>> 2. Data sources may need to preprocess the input data before writing,
>>>> like cluster/sort the input by some columns. It's better to do the
>>>> preprocessing in Spark instead of in the data source.
>>>> 3. Data sources need to take care of transaction themselves, which is
>>>> hard. And different data sources may come up with a very similar approach
>>>> for the transaction, which leads to many duplicated codes.
>>>> To solve these pain points, I'm proposing a data source writing
>>>> framework which is very similar to the reading framework, i.e.,
>>>> WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You
can take
>>>> a look at my prototype to see what it looks like:
>>>> There are some other details need further discussion:
>>>> 1. *partitioning/bucketing*
>>>> Currently only the built-in file-based data sources support them, but
>>>> there is nothing stopping us from exposing them to all data sources. One
>>>> question is, shall we make them as mix-in interfaces for data source v2
>>>> reader/writer, or just encode them into data source options(a
>>>> string-to-string map)? Ideally it's more like options, Spark just transfers
>>>> these user-given informations to data sources, and doesn't do anything for
>>>> it.
>>>> 2. *input data requirement*
>>>> Data sources should be able to ask Spark to preprocess the input data,
>>>> and this can be a mix-in interface for DataSourceV2Writer. I think we need
>>>> to add clustering request and sorting within partitions request, any more?
>>>> 3. *transaction*
>>>> I think we can just follow `FileCommitProtocol`, which is the internal
>>>> framework Spark uses to guarantee transaction for built-in file-based data
>>>> sources. Generally speaking, we need task level and job level commit/abort.
>>>> Again you can see more details in my prototype about it:
>>>> 4. *data source table*
>>>> This is the trickiest one. In Spark you can create a table which points
>>>> to a data source, so you can read/write this data source easily by
>>>> referencing the table name. Ideally data source table is just a pointer
>>>> which points to a data source with a list of predefined options, to save
>>>> users from typing these options again and again for each query.
>>>> If that's all, then everything is good, we don't need to add more
>>>> interfaces to Data Source V2. However, data source tables provide special
>>>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
>>>> sources to have some extra ability.
>>>> Currently these special operators only work for built-in file-based
>>>> data sources, and I don't think we will extend it in the near future, I
>>>> propose to mark them as out of the scope.
>>>> Any comments are welcome!
>>>> Thanks,
>>>> Wenchen
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
> --
> Ryan Blue
> Software Engineer
> Netflix

View raw message