From dev-return-20164-archive-asf-public=cust-asf.ponee.io@beam.apache.org Sat Nov 16 18:58:34 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 39D72180648 for ; Sat, 16 Nov 2019 19:58:34 +0100 (CET) Received: (qmail 17418 invoked by uid 500); 16 Nov 2019 18:58:32 -0000 Mailing-List: contact dev-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list dev@beam.apache.org Received: (qmail 17407 invoked by uid 99); 16 Nov 2019 18:58:32 -0000 Received: from Unknown (HELO mailrelay1-lw-us.apache.org) (10.10.3.42) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Nov 2019 18:58:32 +0000 Received: from mail-ed1-f50.google.com (mail-ed1-f50.google.com [209.85.208.50]) by mailrelay1-lw-us.apache.org (ASF Mail Server at mailrelay1-lw-us.apache.org) with ESMTPSA id 52ECF1003 for ; Sat, 16 Nov 2019 18:58:32 +0000 (UTC) Received: by mail-ed1-f50.google.com with SMTP id t11so4236441eds.13 for ; Sat, 16 Nov 2019 10:58:32 -0800 (PST) X-Gm-Message-State: APjAAAXJ/rdNeSomM+uS2ZPGpfVR4NdgpJxWUomWG5nWfdN6cc/hazLM C5XA7fNG9y9Po1JvuHBR7qwTHqHo45Asv0+er8RoGg== X-Google-Smtp-Source: APXvYqy5lBdYbubcTidCodfZkC5Mteh9qLk9rvllbf6st4C5Fac2GA53tvc3CXvDqHelq+YzFitAdargiVYK4QyLpCs= X-Received: by 2002:a17:906:505:: with SMTP id j5mr12014262eja.167.1573930711211; Sat, 16 Nov 2019 10:58:31 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Kenneth Knowles Date: Sat, 16 Nov 2019 10:58:20 -0800 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: [PROPOSAL] Add support for writing flattened schemas to pubsub To: dev Content-Type: multipart/alternative; boundary="000000000000f2085c05977b4c7e" --000000000000f2085c05977b4c7e Content-Type: text/plain; charset="UTF-8" Big +1 from me. Nice explanation. This makes a lot of sense. Much simpler to understand with fewer magic strings. It also makes the Beam SQL connector less dependent on newer SQL features that are simply less widespread. I'm not too surprised that Calcite's nested row support lags behind the rest of the library. It simply isn't as widespread and important as flat relational structures. And MAP is even less widespread. Kenn On Wed, Nov 13, 2019 at 12:32 PM Brian Hulette wrote: > I've been looking into adding support for writing (i.e. INSERT INTO > statements) for the pubsub DDL, which currently only supports reading. This > DDL requires the defined schema to have exactly three fields: > event_timestamp, attributes, and payload, corresponding to the fields in > PubsubMessage (event_timestamp can be configured to come from either > publish time or from the value in a particular attribute, and the payload > must be a ROW with a schema corresponding to the JSON written to the pubsub > topic). > > When writing, I think it's a bit onerous to require users to use exactly > these three top-level fields. For example imagine we have two topics: > people, and eligible_voters. people contains a stream of {"name": "..", > age: XX} items, and we want eligible_voters to contain a stream with > {"name": ".."} items corresponding to people with age >= 18. With the > current approach this would look like: > > ``` > CREATE TABLE people ( > event_timestamp TIMESTAMP, > attributes MAP, > payload ROW > ) > TYPE 'pubsub' > LOCATION 'projects/my-project/topics/my-topic' > > CREATE TABLE eligible_voters .... > > INSERT INTO eligible_voters ( > SELECT > ROW(payload.name AS name) AS payload > FROM people > WHERE payload.age >= 18 > ) > ``` > > This query has lots of renaming and boiler-plate, and furthermore, ROW(..) > doesn't seem well supported in Calcite, I had to jump through some hoops > (like calling my fields $col1), to make something like this work. > I think it would be great if we could instead handle flattened, > payload-only schemas. We would still need to have a separate > event_timestamp field, but everything else would map to a field in the > payload. With this change the previous example would look like: > > ``` > CREATE TABLE people ( > event_timestamp TIMESTAMP, > name VARCHAR, > age INTEGER > ) > TYPE 'pubsub' > LOCATION 'projects/my-project/topics/my-topic' > > CREATE TABLE eligible_voters ... > > INSERT INTO eligible_voters ( > SELECT > name > FROM people > WHERE age >= 18 > ) > ``` > > This is much cleaner! But the overall approach has an obvious downside - > with the tabke definition written like this it's impossible to read from or > write to the message attributes (unless one is being used for > event_timestamp). I think we can mitigate this in two ways: > 1. In the future, this flattened schema definition could be represented as > something like a view on the expanded definition. We could allow users to > provide some metadata indicating that a column should correspond to a > particular attribute, rather than a field in the payload. To me this feels > similar to how you indicate a column should be indexed in a database. It's > data that's relevant to the storage system, and not to the actual query, so > it belongs in CREATE TABLE. > 2. In the meantime, we can continue to support the current syntax. If a > pubsub table definition has *exactly* three fields with the expected types: > event_timestamp TIMESTAMP, payload ROW<...>, and attributes MAP VARCHAR>, we can continue to use the current codepath. Otherwise we will > use the flattened schema. > > Please let me know if anyone has any objections to this approach, > otherwise I plan on moving forward with it - I should have a PR up shortly. > > Brian > --000000000000f2085c05977b4c7e Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Big=C2=A0+1 from me.

Nice explana= tion. This makes a lot of sense. Much simpler to understand with fewer magi= c strings. It also makes the Beam SQL connector less dependent on newer SQL= features that are simply less widespread. I'm not too surprised that C= alcite's nested row support lags behind the rest of the library. It sim= ply isn't as widespread and important as flat relational structures. An= d MAP is even less widespread.

Kenn

On Wed, Nov 13, 2019 at 12:32 PM Brian Hulette <bhulette@google.com> wrote:
I've been loo= king into adding support for writing (i.e. INSERT INTO statements) for the = pubsub DDL, which currently only supports reading. This DDL requires the de= fined schema to have exactly three fields: event_timestamp, attributes, and= payload, corresponding to the fields in PubsubMessage (event_timestamp can= be configured to come from either publish time or from the value in a part= icular attribute, and the payload must be a ROW with a schema corresponding= to the JSON written to the pubsub topic).

When writing, I think it&= #39;s a bit onerous to require users to use exactly these three top-level f= ields. For example imagine we have two topics: people, and eligible_voters.= people contains a stream of {"name": "..", age: XX} it= ems, and we want eligible_voters to contain a stream with {"name"= : ".."} items corresponding to people with age >=3D 18. With t= he current approach this would look like:

```
CREATE TABLE = people (
=C2=A0 =C2=A0 event_timestamp TIMESTAMP,
=C2=A0 =C2= =A0 attributes MAP<VARCHAR, VARCHAR>,
=C2=A0 =C2=A0 payload= ROW<name VARCHAR, age INTEGER>
=C2=A0 )
=C2=A0 TYPE = 9;pubsub'
=C2=A0 LOCATION 'projects/my-project/topics/my-topic&#= 39;

CREATE TABLE eligible_voters ....
<= div>
INSERT INTO eligible_voters (
=C2=A0 SELECT
=C2=A0 =C2=A0 RO= W(payload.name AS nam= e) AS payload
=C2=A0 =C2=A0 FROM people
=C2=A0 =C2=A0 WHERE pa= yload.age >=3D 18
)
```

This query= has lots of renaming and boiler-plate, and furthermore, ROW(..) doesn'= t seem well supported in Calcite, I had to jump through some hoops (like ca= lling my fields $col1), to make something like this work.
I think it wou= ld be great if we could instead handle flattened, payload-only schemas. We = would still need to have a separate event_timestamp field, but everything e= lse would map to a field in the payload. With this change the previous exam= ple would look like:

```
CREATE TABLE people (
=C2=A0 =C2=A0 e= vent_timestamp TIMESTAMP,
=C2=A0 =C2=A0 name VARCHAR,
= =C2=A0 =C2=A0 age INTEGER
=C2=A0 )
=C2=A0 TYPE 'pubsub'= ;
=C2=A0 LOCATION 'projects/my-project/topics/my-topic'

CREATE TABLE eligible_voters ...

INSERT INTO= eligible_voters (
=C2=A0 SELECT
=C2=A0 =C2=A0 name
=C2=A0 =C2=A0= FROM people
=C2=A0 =C2=A0 WHERE age >=3D 18
)
``= `

This is much cleaner! But the overall approach has an obvious down= side - with the tabke definition written like this it's impossible to r= ead from or write to the message attributes (unless one is being used for e= vent_timestamp). I think we can mitigate this in two ways:
= 1. In the future, this flattened schema definition=C2=A0could be represente= d as something like a view on the expanded definition. We could allow users= to provide some metadata indicating that a column should correspond to a p= articular attribute, rather than a field in the payload. To me this feels s= imilar to how you indicate a column should be indexed in a database. It'= ;s data that's relevant to the storage system, and not to the actual qu= ery, so it belongs in CREATE TABLE.
2. In the meantime, we can continue = to support the current syntax. If a pubsub table definition has *exactly* t= hree fields with the expected types: event_timestamp TIMESTAMP, payload ROW= <...>, and attributes MAP<VARCHAR, VARCHAR>, we can continue to= use the current codepath. Otherwise we will use the flattened schema.
<= br>Please let me know if anyone has any objections to this approach, otherw= ise I plan on moving forward with it - I should have a PR up shortly.
Brian
--000000000000f2085c05977b4c7e--