Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8572F200D1A for ; Mon, 25 Sep 2017 04:17:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 83FE81609E8; Mon, 25 Sep 2017 02:17:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7A5651609E6 for ; Mon, 25 Sep 2017 04:17:16 +0200 (CEST) Received: (qmail 32857 invoked by uid 500); 25 Sep 2017 02:17:14 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 32846 invoked by uid 99); 25 Sep 2017 02:17:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Sep 2017 02:17:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1DAF6182F28 for ; Mon, 25 Sep 2017 02:17:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.401 X-Spam-Level: X-Spam-Status: No, score=-0.401 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 2jf4o4Ue-yRF for ; Mon, 25 Sep 2017 02:17:11 +0000 (UTC) Received: from mail-wr0-f169.google.com (mail-wr0-f169.google.com [209.85.128.169]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 853415F2AD for ; Mon, 25 Sep 2017 02:17:11 +0000 (UTC) Received: by mail-wr0-f169.google.com with SMTP id c23so5269037wrg.9 for ; Sun, 24 Sep 2017 19:17:11 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=MaDi9ChMQ6fLQ35dibFUrTJmTpke4u0fA9KW6rDkdQo=; b=H6Isa9CK4BG5ApgvPo/GtCXjRjHwLR4YFtqbmkpoA2/lRn/XQ6QRsVaiw81jL2zZNl nVTJucWOAheAEXqyCUzZuj2uQhOQbdKn0NpnnvSU3wgLpycJV1vrULwg6HMp8V92r/Ex VZY/ELrT1gLuYXFK4EUsC7BgYJz5+wZwgSnfE3OeupdQa+RfNhfmILw+1DVu3huhAFSL eMGqD5E5lFwdr9mM9vsB05DgDAFXBs0gcts9ghusDcupOEcx3poGJOOxHUpeKwtUSHkZ Qjs0W+vjabzDelFnTuW1GEmNSB8uWgJSvGIMYTEVu0d+AiaAN/1dvBVqz0VAMAJjP9VJ bN0A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=MaDi9ChMQ6fLQ35dibFUrTJmTpke4u0fA9KW6rDkdQo=; b=RC2DfhiIYzDvNRxbwuHuhvfrFUUCFGdbiBvgu+ab+QTRPDnW7TjYM6HK/ODh9b0eDH T2rUuGIjuKjfpuHHhsdbXu8GOlTub70cZ7GEy5k0QuhR5zPYLmal+DIy7v9VD4F6YnUF ShgWXKNEZoD+lu3xx9uNYtKeM/DAnSdsZAUZq+2gTPMij/bB0fgFttviNl+5ehVHa02Y yPZXFJSmIxmo+9uCT0U1lR6HCU2Zytfaj1324FcRD1CLAMfUOsE+p3HxCg+T4OGASguw XB+XGstqXb5hTBV2RWnQX7tr0KMtq7RPaB0ww5FauF1WvmWd3SmxDLJwjqKsQ+4KbZSP KbVQ== X-Gm-Message-State: AHPjjUi5L0712mzmwRM/MExhrzga5g4+L5eG6C3MdyZhPZdfn9nRd98X cQ6Kb14p3NsusTFBPCalnV6s5+PI78ntSyl7hbAANg== X-Google-Smtp-Source: AOwi7QA1gJUpjGpoYO6hYa/6bjhaXC6hqs3JiQ85zxtJf/e6nZfwREH42YupGddG2ljAq3kxBtAj+vdV2AZv92nXB0E= X-Received: by 10.46.7.9 with SMTP id 9mr873615ljh.116.1506305831052; Sun, 24 Sep 2017 19:17:11 -0700 (PDT) MIME-Version: 1.0 Received: by 10.25.28.2 with HTTP; Sun, 24 Sep 2017 19:17:10 -0700 (PDT) In-Reply-To: References: From: Wenchen Fan Date: Mon, 25 Sep 2017 10:17:10 +0800 Message-ID: Subject: Re: [discuss] Data Source V2 write path To: Ryan Blue Cc: Spark dev list Content-Type: multipart/alternative; boundary="f403045ec30afba7fa0559fa279b" archived-at: Mon, 25 Sep 2017 02:17:17 -0000 --f403045ec30afba7fa0559fa279b Content-Type: text/plain; charset="UTF-8" I agree it would be a clean approach if data source is only responsible to write into an already-configured table. However, without catalog federation, Spark doesn't have an API to ask an external system(like Cassandra) to create a table. Currently it's all done by data source write API. Data source implementations are responsible to create or insert a table according to the save mode. As a workaround, I think it's acceptable to pass partitioning/bucketing information via data source options, and data sources should decide to take these informations and create the table, or throw exception if these informations don't match the already-configured table. On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue wrote: > > input data requirement > > Clustering and sorting within partitions are a good start. We can always > add more later when they are needed. > > The primary use case I'm thinking of for this is partitioning and > bucketing. If I'm implementing a partitioned table format, I need to tell > Spark to cluster by my partition columns. Should there also be a way to > pass those columns separately, since they may not be stored in the same way > like partitions are in the current format? > > On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan wrote: > >> Hi all, >> >> I want to have some discussion about Data Source V2 write path before >> starting a voting. >> >> The Data Source V1 write path asks implementations to write a DataFrame >> directly, which is painful: >> 1. Exposing upper-level API like DataFrame to Data Source API is not good >> for maintenance. >> 2. Data sources may need to preprocess the input data before writing, >> like cluster/sort the input by some columns. It's better to do the >> preprocessing in Spark instead of in the data source. >> 3. Data sources need to take care of transaction themselves, which is >> hard. And different data sources may come up with a very similar approach >> for the transaction, which leads to many duplicated codes. >> >> >> To solve these pain points, I'm proposing a data source writing framework >> which is very similar to the reading framework, i.e., WriteSupport -> >> DataSourceV2Writer -> WriteTask -> DataWriter. You can take a look at my >> prototype to see what it looks like: https://github.com/apach >> e/spark/pull/19269 >> >> There are some other details need further discussion: >> 1. *partitioning/bucketing* >> Currently only the built-in file-based data sources support them, but >> there is nothing stopping us from exposing them to all data sources. One >> question is, shall we make them as mix-in interfaces for data source v2 >> reader/writer, or just encode them into data source options(a >> string-to-string map)? Ideally it's more like options, Spark just transfers >> these user-given informations to data sources, and doesn't do anything for >> it. >> >> 2. *input data requirement* >> Data sources should be able to ask Spark to preprocess the input data, >> and this can be a mix-in interface for DataSourceV2Writer. I think we need >> to add clustering request and sorting within partitions request, any more? >> >> 3. *transaction* >> I think we can just follow `FileCommitProtocol`, which is the internal >> framework Spark uses to guarantee transaction for built-in file-based data >> sources. Generally speaking, we need task level and job level commit/abort. >> Again you can see more details in my prototype about it: >> https://github.com/apache/spark/pull/19269 >> >> 4. *data source table* >> This is the trickiest one. In Spark you can create a table which points >> to a data source, so you can read/write this data source easily by >> referencing the table name. Ideally data source table is just a pointer >> which points to a data source with a list of predefined options, to save >> users from typing these options again and again for each query. >> If that's all, then everything is good, we don't need to add more >> interfaces to Data Source V2. However, data source tables provide special >> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data >> sources to have some extra ability. >> Currently these special operators only work for built-in file-based data >> sources, and I don't think we will extend it in the near future, I propose >> to mark them as out of the scope. >> >> >> Any comments are welcome! >> Thanks, >> Wenchen >> > > > > -- > Ryan Blue > Software Engineer > Netflix > --f403045ec30afba7fa0559fa279b Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I agree it would be a clean approach if data source is onl= y responsible to write into an already-configured table. However, without c= atalog federation, Spark doesn't have an API to ask an=C2=A0external sy= stem(like Cassandra) to create a table. Currently it's all done by data= source write API. Data source implementations are responsible to create or= insert a table according to the save mode.

As a workaro= und, I think it's acceptable to pass partitioning/bucketing information= via data source options, and data sources should decide to take these info= rmations=C2=A0and create the table, or throw exception=C2=A0if these inform= ations don't match the already-configured table.


On Fri, Sep 2= 2, 2017 at 9:35 AM, Ryan Blue <rblue@netflix.com> wrote:
=
>=C2=A0input data requir= ement

Clustering and sorting within partitions are a goo= d start. We can always add more later when they are needed.

<= /div>
The primary use case I'm thinking of for this is partitioning= and bucketing. If I'm implementing a partitioned table format, I need = to tell Spark to cluster by my partition columns. Should there also be a wa= y to pass those columns separately, since they may not be stored in the sam= e way like partitions are in the current format?

On Wed, Sep 20,= 2017 at 3:10 AM, Wenchen Fan <cloud0fan@gmail.com> wrote:=
Hi all,

I want to have some discussion about Da= ta Source V2 write path before starting a voting.

= The Data Source V1 write path asks implementations to write a DataFrame dir= ectly, which is painful:
1. Exposing upper-level API like DataFra= me to Data Source API is not good for maintenance.
2. Data source= s may need to preprocess the input data before writing, like cluster/sort t= he input by some columns. It's better to do the preprocessing in Spark = instead of in the data source.
3. Data sources need to take care = of transaction themselves, which is hard. And different data sources may co= me up with a very similar approach for the=C2=A0transaction, which leads to= many duplicated codes.


To solve th= ese pain points, I'm proposing a data source writing framework which is= very similar to the reading framework, i.e., WriteSupport -> DataSource= V2Writer -> WriteTask -> DataWriter. You can take a look at my protot= ype to see what it looks like:=C2=A0https://github.com/apache/spark/pull= /19269

There are some other details need furth= er discussion:
1. partitioning/bucketing
Current= ly only the built-in file-based data sources support them, but there is not= hing stopping us from exposing them to all data sources. One question is, s= hall we make them as mix-in interfaces for data source v2 reader/writer, or= just encode them into data source options(a string-to-string map)? Ideally= it's more like options, Spark just transfers these user-given informat= ions to data sources, and doesn't do anything for it.

2. input data requirement
Data sources should be a= ble to ask Spark to preprocess the input data, and this can be a mix-in int= erface for DataSourceV2Writer. I think we need to add clustering request an= d sorting within partitions request, any more?

3. = transaction
I think we can just follow `FileCommitProtocol= `, which is the internal framework Spark uses to guarantee transaction for = built-in file-based data sources. Generally speaking, we need task level an= d job level commit/abort. Again you can see more details in my prototype ab= out it: https://github.com/apache/spark/pull/19269

4. data source table
This is the trickiest one. In= Spark you can create a table which points to a data source, so you can rea= d/write this data source easily by referencing the table name. Ideally data= source table is just a pointer which points to a data source with a list o= f predefined options, to save users from typing these options again and aga= in for each query.
If that's all, then everything is good, we= don't need to add more interfaces to Data Source V2. However, data sou= rce tables provide special operators like ALTER TABLE SCHEMA, ADD PARTITION= , etc., which requires data sources to have some extra ability.
C= urrently these special operators only work for built-in file-based data sou= rces, and I don't think we will extend it=C2=A0in the near future, I pr= opose to mark them as out of the scope.


=
Any comments are welcome!
Thanks,
Wenchen


=
--
Ryan B= lue
Software Engineer
Netfli= x

--f403045ec30afba7fa0559fa279b--