flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: [DISCUSS] Some thoughts about unify Stream SQL and Batch SQL grammer
Date Mon, 29 Aug 2016 12:59:12 GMT
At first glance, I thought we are losing the possibility to distingish 
between choosing a batch or streaming table if a TableSource implements 
both. Because currently you are using a StreamTableSource as default if 
a TableSource implements both types. I think it would be better to 
determine batch or stream using the type of execution environment. What 
do you think?


Am 29/08/16 um 14:31 schrieb Jark Wu:
> Hi Timo,
> Yes, you are right. The STREAM keyword is invalid now. If there is a STREAM keyword in
the query, the parser will throw "can’t convert table xxx to stream" exception. Because
we register the table as a regular table not streamable.
> - Jark Wu
>> 在 2016年8月29日,下午8:13,Timo Walther <twalthr@apache.org> 写道:
>> Hi Jark,
>> your code looks good and it also simplifies many parts. So the STREAM keyword is
not optional but invalid now, right? What happens if there is keyword in the query?
>> Timo
>> Am 29/08/16 um 05:40 schrieb Jark Wu:
>>> Hi Fabian, Timo,
>>> I have created a prototype for removing STREAM keyword and using batch sql parser
for stream jobs.
>>> This is the working brach:  https://github.com/wuchong/flink/tree/remove-stream
>>> Looking forward to your feedback.
>>> - Jark Wu
>>>> 在 2016年8月24日,下午4:56,Fabian Hueske <fhueske@gmail.com>
>>>> Starting with a prototype would be great, Jark.
>>>> We had some trouble with Calcite's StreamableTable interface anyways. A few
>>>> things can be simplified if we do not declare our tables as streamable.
>>>> I would try to implement DataStreamTable (and all related classes and
>>>> methods) equivalent to DataSetTables if possible.
>>>> Best, Fabian
>>>> 2016-08-24 6:27 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com>:
>>>>> Hi Fabian,
>>>>> You are right, the main thing we need to change for removing STREAM
>>>>> keyword is the table registration. If you would like, I can do a prototype.
>>>>> Hi Timo,
>>>>> I’m glad to contribute our work back to Flink. I will look into it
>>>>> create JIRAs next days.
>>>>> - Jark Wu
>>>>>> 在 2016年8月24日,上午12:13,Fabian Hueske <fhueske@gmail.com>
>>>>>> Hi Jark,
>>>>>> We can think about removing the STREAM keyword or not. In principle,
>>>>>> Calcite should allow the same windowing syntax on streaming and static
>>>>>> tables (this is one of the main goals of Calcite). The Table API
can also
>>>>>> distinguish stream and batch without the STREAM keyword by looking
at the
>>>>>> ExecutionEnvironment.
>>>>>> I think we would need to change the way that tables are registered
>>>>>> Calcite's catalog and also add more validation (check that time windows
>>>>>> refer to a time column, etc).
>>>>>> A prototype should help to see what the consequence of removing the
>>>>> STREAM
>>>>>> keyword (which is actually, changing the table registration, the
>>>>> is
>>>>>> the same) would be.
>>>>>> Regarding streaming aggregates without window definition: We can
>>>>> certainly
>>>>>> implement this feature in the Table API. There are a few points that
>>>>>> to be considered like value expiration after a certain time of update
>>>>>> inactivity (otherwise the state might grow infinitely). But these
>>>>>> should be rather easy to solve. I think for SQL, such running aggregates
>>>>>> are a special case of the Sliding Windows as discussed in Calcite's
>>>>>> StreamSQL document [1].
>>>>>> Thanks also for the document! I'll take that into account when sketching
>>>>>> the FLIP for streaming aggregation support.
>>>>>> Cheers, Fabian
>>>>>> [1] http://calcite.apache.org/docs/stream.html#sliding-windows
>>>>>> 2016-08-23 13:09 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com>:
>>>>>>> Hi Fabian, Timo,
>>>>>>> Sorry for the late response.
>>>>>>> Regarding Calcite’s StreamSQL syntax, what I concern is only
>>>>>>> keyword and no agg-without-window. Which makes different syntax
>>>>>>> streaming and static tables. I don’t think Flink should have
a custom
>>>>> SQL
>>>>>>> syntax, but it’s better to have a consistent syntax for batch
>>>>>>> streaming. Regarding window syntax , I think it’s good and
reasonable to
>>>>>>> follow Calcite’s syntax. Actually, we implement Blink SQL Window
>>>>> following
>>>>>>> Calcite’s syntax[1].
>>>>>>> In addition, I describe the Blink SQL design including UDF, UDTF,
>>>>>>> Window in google doc[1]. Hope that can help for the upcoming
Flink SQL
>>>>>>> design.
>>>>>>> +1 for creating FLIP
>>>>>>> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzb
>>>>>>> buVFPZWBYuY1Ek
>>>>>>> - Jark Wu
>>>>>>>> 在 2016年8月23日,下午3:47,Fabian Hueske <fhueske@gmail.com>
>>>>>>>> Hi,
>>>>>>>> I did a bit of prototyping yesterday to check to what extend
>>>>>>>> supports window operations on streams if we would implement
them for
>>>>> the
>>>>>>>> Table API.
>>>>>>>> For the Table API we do not go through Calcite's SQL parser
>>>>>>> validator,
>>>>>>>> but generate the logical plan (tree of RelNodes) ourselves
mostly using
>>>>>>>> Calcite's Relbuilder.
>>>>>>>> It turns out that Calcite does not restrict grouped aggregations
>>>>>>> streams
>>>>>>>> at this abstraction level, i.e., it does not perform any
>>>>>>>> I think it should be possible to implement windowed aggregates
for the
>>>>>>>> Table API. Once CALCITE-1345 [1] is implemented (and released),
>>>>> windowed
>>>>>>>> aggregates are also supported by the SQL parser, validator,
>>>>>>> optimizer.
>>>>>>>> In order to make them work with our implementation we would
need to
>>>>> adapt
>>>>>>>> our solution to it (only internally), but I am sure we could
reuse a
>>>>> lot
>>>>>>> of
>>>>>>>> our initial implementation (Table API, validation, execution).
>>>>>>>> I drafted an API proposal a few months ago [2] and could
convert this
>>>>>>> into
>>>>>>>> a FLIP to discuss the API and break it down into subtasks.
>>>>>>>> What do you think?
>>>>>>>> Cheers, Fabian
>>>>>>>> [1] https://issues.apache.org/jira/browse/CALCITE-1345
>>>>>>>> [2]
>>>>>>>> https://docs.google.com/document/d/19kSOAOINKCSWLBCKRq2WvNtmuaA9o
>>>>>>> 3AyCh2ePqr3V5E
>>>>>>>> 2016-08-19 11:04 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>>>>>>> Hi Jark,
>>>>>>>>> thanks for starting this discussion. Actually, I think
we are rather
>>>>>>>>> "blocked" on the internal handling of streaming windows
in Calcite
>>>>> than
>>>>>>> the
>>>>>>>>> SQL parser. IMO, it should be possible to exchange or
modify the
>>>>> parser
>>>>>>> if
>>>>>>>>> we want that.
>>>>>>>>> Regarding Calcite's StreamSQL syntax: Except for the
STREAM keyword,
>>>>>>>>> Calcite closely follows the SQL standard (e.g.,no special
>>>>> like
>>>>>>>>> WINDOW. Instead stream specific aspects like tumbling
windows are done
>>>>>>> as
>>>>>>>>> functions such as TUMBLE [1]). One main motivation of
the Calcite
>>>>>>> community
>>>>>>>>> is to have the same syntax for streaming and static tables.
>>>>>>> includes
>>>>>>>>> support for tables which are static and streaming at
the same time
>>>>> (the
>>>>>>>>> example of [1] is a table about orders to which new order
records are
>>>>>>>>> added). When querying such a table, the STREAM keyword
is required to
>>>>>>>>> distinguish the cases of a batch query which returns
a result set and
>>>>> a
>>>>>>>>> standing query which returns a result stream. In the
context of Flink
>>>>> we
>>>>>>>>> can can do the distinction using the type of the TableEnvironment.
>>>>> we
>>>>>>>>> could use the batch parser, but would need to change
a couple things
>>>>>>>>> internally and add checks for proper grouping on the
timestamp column
>>>>>>> when
>>>>>>>>> doing windows, etc. So far the discussion about the StreamSQL
>>>>>>> rather
>>>>>>>>> focused on the question whether 1) StreamSQL should follow
the SQL
>>>>>>> standard
>>>>>>>>> (as Calcite proposes) or 2) whether Flink should use
a custom syntax
>>>>>>> with
>>>>>>>>> stream specific features. For instance a tumbling window
is expressed
>>>>> in
>>>>>>>>> the GROUP BY clause [1] when following standard SQL but
it could be
>>>>>>> defined
>>>>>>>>> using a special WINDOW keyword in a custom StreamSQL
>>>>>>>>> You are right that we have a dependency on Calcite. However,
I think
>>>>>>> this
>>>>>>>>> dependency is rather in the internals than the parser,
i.e., how does
>>>>>>> the
>>>>>>>>> validator/optimizer support and handle monotone / quasi-monotone
>>>>>>> attributes
>>>>>>>>> and windows. I am not sure how much is already supported
but the
>>>>> Calcite
>>>>>>>>> community is working on this [2]. I think we need these
features in
>>>>>>> Calcite
>>>>>>>>> unless we want to completely remove our dependency on
Calcite for
>>>>>>>>> StreamSQL. I would not be in favor of removing Calcite
at this point.
>>>>> We
>>>>>>>>> put a lot of effort into refactoring the Table API internals.
>>>>> we
>>>>>>>>> should start to talk to the Calcite community and see
how far they
>>>>> are,
>>>>>>>>> what is missing, and how we can help.
>>>>>>>>> I will start a discussion on the Calcite dev mailing
list in the next
>>>>>>> days
>>>>>>>>> and ask about the status of StreamSQL.
>>>>>>>>> Best,
>>>>>>>>> Fabian
>>>>>>>>> [1] http://calcite.apache.org/docs/stream.html#tumbling-
>>>>>>> windows-improved
>>>>>>>>> [2] https://issues.apache.org/jira/browse/CALCITE-1345
>> -- 
>> Freundliche Grüße / Kind Regards
>> Timo Walther
>> Follow me: @twalthr
>> https://www.linkedin.com/in/twalthr

Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr

View raw message