beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Goenka <>
Subject Re: Pubsub to Beam SQL
Date Fri, 04 May 2018 00:44:33 GMT
I like the idea of exposing source timestamp in TBLPROPERTIES which is
closely tied to source (KafkaIO, KinesisIO, MqttIO, AmqpIO, unbounded
FileIO, PubSubIO).
Exposing timestamp as a top level keyword will break the symmetry between
streaming and batch pipelines.
TBLPROPERTIES gives us flexibility on defining timestamp in source specific
way if needed.

On Thu, May 3, 2018 at 4:08 PM Kenneth Knowles <> wrote:

> It is an interesting question for Beam DDL - since timestamps are
> fundamental to Beam's data model, should we have a DDL extension that makes
> it very explicit? Seems nice, but perhaps TBLPROPERTIES is a way to stage
> the work, getting the functionality in place first and the parsing second.
> What would the TIMESTAMP (let's maybe choose a term that isn't already
> reserved) metadata thing look like for e.g. KafkaIO, KinesisIO, MqttIO,
> AmqpIO, unbounded FileIO? I think a lot of these don't actually have any
> configurability so maybe it is moot. Does Calcite already have an opinion
> about timestamps on rows?
> Kenn
> On Thu, May 3, 2018 at 1:02 PM Andrew Pilloud <> wrote:
>> I like to avoid magic too. I might not have been entirely clear in what I
>> was asking. Here is an example of what I had in mind, replacing the TBLPROPERTIES
>> with a more generic TIMESTAMP option:
>> CREATE TABLE  table_name (
>>   publishTimestamp TIMESTAMP,
>>   attributes MAP(VARCHAR, VARCHAR),
>>   payload ROW (
>>                name VARCHAR,
>>                age INTEGER,
>>                isSWE BOOLEAN,
>>                tags ARRAY(VARCHAR)))
>> TIMESTAMP attributes["createTime"];
>> Andrew
>> On Thu, May 3, 2018 at 12:47 PM Anton Kedin <> wrote:
>>> I think it makes sense for the case when timestamp is provided in the
>>> payload (including pubsub message attributes).  We can mark the field as an
>>> event timestamp. But if the timestamp is internally defined by the source
>>> (pubsub message publish time) and not exposed in the event body, then we
>>> need a source-specific mechanism to extract and map the event timestamp to
>>> the schema. This is, of course, if we don't automatically add a magic
>>> timestamp field which Beam SQL can populate behind the scenes and add to
>>> the schema. I want to avoid this magic path for now.
>>> On Thu, May 3, 2018 at 11:10 AM Andrew Pilloud <>
>>> wrote:
>>>> This sounds awesome!
>>>> Is event timestamp something that we need to specify for every source?
>>>> If so, I would suggest we add this as a first class option on CREATE TABLE
>>>> rather then something hidden in TBLPROPERTIES.
>>>> Andrew
>>>> On Wed, May 2, 2018 at 10:30 AM Anton Kedin <> wrote:
>>>>> Hi
>>>>> I am working on adding functionality to support querying Pubsub
>>>>> messages directly from Beam SQL.
>>>>> *Goal*
>>>>>   Provide Beam users a pure  SQL solution to create the pipelines with
>>>>> Pubsub as a data source, without the need to set up the pipelines in
>>>>> Java before applying the query.
>>>>> *High level approach*
>>>>>    -
>>>>>    - Build on top of PubsubIO;
>>>>>    - Pubsub source will be declared using CREATE TABLE DDL statement:
>>>>>       - Beam SQL already supports declaring sources like Kafka and
>>>>>       Text using CREATE TABLE DDL;
>>>>>       - it supports additional configuration using TBLPROPERTIES
>>>>>       clause. Currently it takes a text blob, where we can put a JSON
>>>>>       configuration;
>>>>>       - wrapping PubsubIO into a similar source looks feasible;
>>>>>    - The plan is to initially support messages only with JSON payload:
>>>>>    -
>>>>>       - more payload formats can be added later;
>>>>>    - Messages will be fully described in the CREATE TABLE statements:
>>>>>       - event timestamps. Source of the timestamp is configurable. It
>>>>>       is required by Beam SQL to have an explicit timestamp column for
>>>>>       support;
>>>>>       - messages attributes map;
>>>>>       - JSON payload schema;
>>>>>    - Event timestamps will be taken either from publish time or
>>>>>    user-specified message attribute (configurable);
>>>>> Thoughts, ideas, comments?
>>>>> More details are in the doc here:
>>>>> Thank you,
>>>>> Anton

View raw message