flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xingcan <xingc...@gmail.com>
Subject Re: STREAM SQL inner queries
Date Thu, 26 Jan 2017 10:33:51 GMT
Hi all,

I've read the document about dynamic table. Honestly, I think it's
well-defined and ingeniously compromise the batch and stream. There are two
questions about the design.

1) Though it's fine to take the stream as a snapshot of a dynamic table, a
table is essentially a set while a stream is essentially an ordered list
(with xxTime). I'm not sure if the operations on a set will all suit for a
list (e.g union or merge?). Of course, we can add an "order by time" to all
SQL instances, but will it be suitable?

2) As radu said, I also think inner query is essential for a query
language. (I didn't see any select from (select) in the document). The
problem is, the SQL is based on a closure theory while we can not prove
that for a stream. Can the result from a stream operation be another input?
It depends. The window operator will convert "point of time" events to
 "period of time" events and I don't know if the nature of data have
changed. Also, the partial emission will lead to heterogeneous results.

BTW, the "Emission of dynamic tables" section seem to be a little
incompatible with the whole document...

Best,
Xingcan

On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran <radu.tudoran@huawei.com>
wrote:

> Hi Shaoxuan,
>
> Thanks for the feedback!
> Regarding the proposal for relational queries that you referenced, I am a
> bit confused with respect to its purpose and evolution with respect to the
> current implementation of stream sql - is it suppose to replace this
> implementation, to complement it....but I will send another email about
> this as I guess this can be a standalone discussion tread
>
> Also, regarding the join stream-to-stream I intend to start another
> discussion about this such that we can decide all together if we can start
> some implementation/design now or we need to wait.
>
> Now, regarding the inner queries and the points you raised. It is true
> that in general an inner join would work like any other join (which
> obviously requires some buffering capabilities and mechanisms to restrict
> the infinite growth for the join state composition). However, at least for
> some cases of supporting inner queries we can support them without the need
> for buffering mechanisms or full support for inner join / left join.
> Basically the logical operator in which an inner query is translated (left
> join with an always true condition is to some extend more similar to UNION
> ,- and the union implementation, then the implementation we will have for
> the joins). This is why I believe we can already provide the support for
> this (I also tested a PoC implementation internally for this and it works).
> In terms of examples when we could use this, please see the next 2
> examples. Please let me know what do you think and whether it is worth
> designing the jira issue perhaps with some more details (including the
> technical details).
>
> Consider the example below:
>
> SELECT STREAM user
>         FROM inputstream
>         WHERE amount > (SELECT STREAM Min(amount2) FROM inputstream2)
>
> The point of this is to restrict the values you are selecting based on
> some value that you have from the other stream. Consider the values below
> that come in each stream
>
> Inputstream             inputstream2            Result
> User1,100                                       user1 (because there is no
> value in inputstream2 and the left join should not restrict the output in
> this case)
>                         X,x,10                  nothing as there is no
> event in inputstream to be outputted. Min will become from now 10
> User2, 20                                       user2 (because 20 is
> greater than 10 which is the minimum retain in inputstream2)
>                         X,x,20                  nothing as there is no
> event in inputstream to be outputted. Min will remain from now 10
>                         X,x, 5                  nothing as there is no
> event in inputstream to be outputted. Min will become from now 5
> User3, 8                                        User3 (because 8 is
> greater than 5)
> ....
>
>
> The goal for the final usage of this is to be able among others to define
> multiple window processing on the same input stream. Consider:
>
> SELECT STREAM user
>         FROM inputstream
>         WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY timestamp
> RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM inputstream) < amount
>
>
> Assume you have the following events each coming every 30 minutes
> User1, 100   -> Average is 100 and the output of the topology that
> implements the query is NULL (no output as 100 is not > than 100)
> User2, 10    -> Average is 55 and the output of the topology that
> implements the query is NULL (no output as 10 is not > than 55)
> User3, 40    -> Average is 25 (10+40) and the output of the topology that
> implements the query is User3 (40 is > than 25)
> ....
> Although the query as it is depends on aggregates and windows, the
> operator to implement the inner query can be implemented independently of
> functions that are contained in the query. Also, there is no need for a
> window or buffering to implement the logic for assembling the results from
> the inner query.
>
>
> Best regards,
>
> -----Original Message-----
> From: Shaoxuan Wang [mailto:wshaoxuan@gmail.com]
> Sent: Thursday, January 26, 2017 4:36 AM
> To: dev@flink.apache.org
> Subject: Re: STREAM SQL inner queries
>
>  Hi Radu,
> Similar as the stream-stream join, this stream-stream inner query does not
> seem to be well defined. It needs provide at least some kind of window
> bounds to complete the streaming SQL semantics. If this is an unbounded
> join/select, a mechanism of how to store the infinite date has to be
> considered. I may not fully understand your proposal. Could you please
> provide more details about this inner query, say giving some examples of
> input and output. It would be also great if you can explain the use case of
> this inner query. This helps us to understand the semantics.
>
> It should also be noted that, we have recently decided to unify stream and
> batch query with the same regular (batch) SQL. Therefore we have removed
> the support for STREAM keyword in flink Streaming SQL. In the past several
> months, Fabian and Xiaowei Jiang have started to work on the future
> Relational Queries on flink streaming. Fabian has drafted a very good
> design doc, https://goo.gl/m31kkE. The design is based on a new concept
> of dynamic table whose content changes over time, thereby can be derived
> from streams. With this dynamic table, stream query can be done via regular
> (batch) SQL. Besides some syntax sugar, there is not too much difference
> between batch query and stream query (in terms of what and where of a query
> is executed). Stream query has addition characters in the manners of when
> to emit a result and how to refine the result considering the retraction.
>
> Hope this helps and look forward to working with you on streaming SQL.
>
> Regards,
> Shaoxuan
>
>
> On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran <radu.tudoran@huawei.com>
> wrote:
>
> > Hi all,
> >
> > I would like to open a jira issue (and then provide the
> > implementation) for supporting inner queries. The idea is to be able
> > to support SQL queries as the ones presented in the scenarios below.
> > The key idea is that supporting inner queries would require to have the
> implementation for:
> >
> > è JOIN (type = left and condition = true) - Basically this is a simple
> > implementation for a join function between 2 streams that does not
> > require any window support behind the scenes as there is no condition
> > on which to perform the join
> >
> > è SINGLE_VALUE - this operator would require to provide one value to
> > be furthered joined. In the context of streaming this value should
> > basically evolve with the contents of the window. This could be
> > implemented with a flatmap function as left joins would allow also to
> > do the mapping with null values
> >
> > We can then extend this initial and simple implementation to provide
> > support for joins in general (conditional joins, right joins..) or we
> > can isolate this implementation for this specific case of inner
> > queries and go with a totally new design for stream to stream joins
> > (might be needed depending on what is the decision behind on how to
> > support the conditional
> > mapping)
> >
> > What do you think about this?
> >
> > Examples of scenarios to apply
> >
> > SELECT STREAM amount,
> > (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
> >
> > Translated to
> > LogicalProject(amount=[$1], c=[$4])
> >     LogicalJoin(condition=[true], joinType=[left])
> >       LogicalTableScan(table=[[inputstream1]])
> >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> >         LogicalProject(user_id=[$0])
> >           LogicalTableScan(table=[[inputstream2]])
> >
> > Or from the same stream - perhaps interesting for applying some more
> > complex operations within the inner query SELECT STREAM amount,
> > (SELECT id FROM  inputstream1) AS field1 FROM inputstream1
> >
> > Translated to
> > LogicalProject(amount=[$1], c=[$4])
> >     LogicalJoin(condition=[true], joinType=[left])
> >       LogicalTableScan(table=[[inputstream1]])
> >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> >         LogicalProject(user_id=[$0])
> >           LogicalTableScan(table=[[inputstream1]])
> >
> > Or used to do the projection
> > SELECT STREAM amount, c  FROM (SELECT *,id AS c FROM inputstream1)
> >
> > Translated to
> >   LogicalProject(amount=[$1], c=[$5])
> >     LogicalProject(time=[$0], amount =[$1], date=[$2], id =[$4], c=[$5])
> >       LogicalTableScan(table=[[inputstream1]])
> >
> >
> > Or in the future even
> > SELECT STREAM amount, myagg FROM  (SELECT STREAM *, SUM(amount) OVER
> > window AS myagg FROM inputstream1)) ...
> >
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message