flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: STREAM SQL inner queries
Date Thu, 02 Feb 2017 23:14:53 GMT
Hi Radu,

I think mostly processing time queries will fall into the category of
queries that will never require to update a previously emitted result.
Event-time queries have to prepare for late arriving data (which is not
possible with processing time) and use cases that require event-time
usually do not want to drop late data.

The query semantics that you intended for your first example in this thread
should be implementable without updates for processing time.
However, the query would need to be differently specified, similar as the
following one which was adapted from the Temporal Table document [1].

SELECT​ ​i2.amount​,​ ​i1.id
FROM​ ​
  InputStream2​ ​AS​ i2,
​ ​​ ​InputStream1 ​AS​ ​i1
WHERE ​procTime(i1)​ ​=​ ​(
​ ​​ ​SELECT​ ​MAX(procTime(i1_2))
​ ​​ ​FROM​ ​InputStream1 ​AS​ ​i1_2
​ ​​ ​AND​ ​procTime(i1_2)​ ​<=​ procTime(​i2))

Note that the query is not a simple left-join, i.e., the number for
returned rows is larger than InputStream2 if more more than one rows in
InputStream1 have the same procTime() (might be OK, if InputStream1 is only
slowly changing).

Also note, that we have to think more about the procTime() function which
is intended to just be a marker for processing time.
However, in case of join queries we need to indicate which table we refer
to. So this detail needs to be fleshed out.

I think in principle, we could start to work on such joins, but given that
the condition is not trivial and involves a correlated subquery it might
take more time than initially expected to implement the optimization and
translation of such joins.

Best, Fabian

[1]
https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqFMBtzYiIY4dHe0Q

2017-01-31 22:47 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:

> Hi,
>
> I understand the logic and indeed considering the " batch and stream query
> equality " it makes the version you have proposed (with the materialized
> view for inputstream2.
>
> You also mentioned there might be some queries that will never require to
> update previously emitted results such as queries that discard late
> arriving data and do not compute early results. For these we can and should
> apply runtime optimizations and simply append to the previously emitted
> records.
>
> Should we thus try to identify now such queries and provide the inner
> query implementation for these? Should we wait and see how we deal with
> materilized view first?
> Basically I am asking what should we do now?
>
> BTW - thanks for the patience for the discussion and for brainstorming on
> this!
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhueske@gmail.com]
> Sent: Tuesday, January 31, 2017 3:45 PM
> To: dev@flink.apache.org
> Subject: Re: STREAM SQL inner queries
>
> Hi,
>
> If the goal is that the materialized result of a streaming query should be
> equivalent to the result of a batch query on the materialized input, we
> need to update previously emitted data.
> Only appending to the already emitted results will not work in most of the
> cases.
>
> In case of the join query you proposed
>
> SELECT STREAM amount,
> (SELECT id FROM  inputstream1 ORDER BY time LIMIT 1) AS field1 FROM
> inputstream2
>
> we would need to fully materialize inputstream2 (and all emitted result
> rows). Inputstream1 would not need to be materialized.
> When a new record from inputstream1 would arrive, we would need to discard
> all previously emitted rows and join the whole inputstream2 with the new
> single-value.
> This does of course not work in practice. Hence the query would not be
> allowed by the optimizer.
>
> The problem could be fixed by explicitly specifying the join condition,
> that you are implicitly expecting, i.e., that each record of inputstream2
> is joined with the current single-value of inputstream1.
> If we would rely on the engine to implicitly apply this join condition by
> not updating previously emitted records, there would be two major problems:
>
> 1) the query would not compute what it states (given batch and stream query
> equality)
> 2) the result would not be well defined and arbitrary, i.e., it does not
> solely depend on the data and the query but on the query processors,
> ingestion speed, etc.
>
> There might be some queries that will never require to update previously
> emitted results such as queries that discard late arriving data and do not
> compute early results.
> For these we can and should apply runtime optimizations and simply append
> to the previously emitted records.
>
> The dynamic table document proposes two output modes:
> - update mode by key
> - append/retract mode which retracts invalid results and appends new
> results.
>
> A mode that only appends would of course be possible to implement but
> would suffer from inconsistent semantics for most queries.
>
> Best, Fabian
>
>
>
> 2017-01-31 13:41 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
>
> > Hi,
> >
> > I was thinking about this reply...
> > I am not sure I understand exactly the idea why would you need to keep
> > the whole state for Option 2. From my point of view this is not needed
> > (and I see this as the easy case). The main reason is that you have
> > the SINGLE_VALUE operator which would imply that you do not keep the
> > whole state but rather update a single value. This is of course valid
> > only for the operators that do not require to re-apply a full
> > aggregation. For example it would work for MIN, MAX, Value Selection
> > (e.g. last value)...but would not work on SUM, Average or
> > Count...unless is an unbound aggregation where you only update with
> > the new values. Basically we could fail/throw an exception just like
> > for the dynamic tables in case there are not enough resources to
> > enable to compute the query
> >
> > Nevertheless, I see the discussion goes binding this with the concept
> > of the dynamic tables. In this case I would suggest that the
> > distinction between the two option to be done based on the existence
> > of an ID in the stream. This is the idea that makes the
> > differentiation between append tables and update tables. We could use
> the same here.
> >
> > If the inner stream on which we apply the inner query has an ID, then
> > option 1 (recomputed and apply updates based on retraction and all
> > others); otherwise then option 2 (only make an update in the operator
> > that keeps the single value - if this is possible)
> >
> > Best regards,
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudoran@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > Sent: Monday, January 30, 2017 8:39 PM
> > To: dev@flink.apache.org
> > Subject: Re: STREAM SQL inner queries
> >
> > Hi Radu,
> >
> > Updates of the result (materialized view) are not always simple appends.
> > If the query is a non-windowed aggregation or a windowed aggregation (or
> > join) with late data, some parts of the result need to be removed or
> > updated.
> > I think in order to implement the second option, we would need to emit
> the
> > complete result for every update because we do not know which parts of
> the
> > previous view became invalid. This is not practical, because it would
> mean
> > to hold the complete result as state and to the complete result for every
> > update.
> >
> > In contrast, the first option sends retraction and update records to
> > update the latest view.
> > Moreover, we only need to hold those results as state that might be
> > updated and not the complete result.
> >
> > I agree that the discussion helps a lot.
> >
> > Best, Fabian
> >
> > 2017-01-30 15:49 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
> >
> > > Hi,
> > >
> > > I would like to ask for a further clarifications about the statement:
> > > " a streaming query should be equivalent to the result of a batch
> > > query that is executed on the materialized stream "
> > >
> > > I do agree with the principle but the question that we I would like to
> > > ask is how do we interpret the relation between a stream and the
> > > materialized view of the stream at some point. If we consider that we
> > > materialize the view on the elements we received on the stream until
> > > moment X (let's say it had elements 1 2 3) and we apply an SQL query,
> > > indeed this should give the exact same result as if 1 2 3 would be in
> > > a database/batch and we apply the same logic. However, some time later
> > > in the future, if we receive another element (e.g. 4) do we have the
> > > same materialized view, which we update or we consider a new state, a
> > > new materialized view and therefore a new scenario. Basically assuming
> > > we take last value. Then we can have two
> > > options:
> > >
> > > Option 1)  At moment x the output is 3 (last  value of the
> > > materialized view of 1 2 3 is 3) and then at moment X+1 when 4
> > > arrives, the last value remains unchanged 1 (it is the same
> > > materialize view) Option 2) At moment x the output is 3 (last  value
> > > of the materialized view of 1 2 3 is 3) and then at moment X+1 when 4
> > > arrives, the first value is modified to 4 (it is a new materialized
> > > view and the output is as we would apply the SQL query on the batch
> > > case with all elements 1 2 3 4)
> > >
> > > I would assume (based on previous discussions and the panel in flink
> > > forward) that we rather go for option 2. The correct output of a SQL
> > > query on a stream is that one would create a materialized view at that
> > > point in time and apply the query in batch mode. When a new element
> > > arrives (stream
> > > evolves) then we will get a new materialized view.
> > >
> > > If this is the case as my assumption that I would say that SINGLE_
> > > VALUE should be continuously updated as the stream on top of which is
> > > applied evolves.
> > >
> > > My 2cents (anyway - I think the discussion is very useful and
> > > hopefully applicable also for other operators/scenarios that we are
> > > going to
> > > implement)
> > >
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > Sent: Monday, January 30, 2017 2:09 PM
> > > To: dev@flink.apache.org
> > > Subject: Re: STREAM SQL inner queries
> > >
> > > Hi Radu,
> > >
> > > I think it is most important to get the semantics of a streaming query
> > > right.
> > > In my opinion, the result of a streaming query should be equivalent to
> > > the result of a batch query that is executed on the materialized
> stream.
> > > It should not matter whether you append the records received from a
> > > Kafka topic to a table and execute a batch query on that table or if
> > > you do run the same query continuously on the Kafka topic.
> > >
> > > It is correct, that some queries become too expensive to compute if we
> > > implement these semantics.
> > > However, this would be the price to pay for stream-batch consistent
> > > semantics.
> > >
> > > Regarding the inner query case. I think a query should yield the same
> > > result, regardless of whether it is an inner or outer query.
> > > This is one of the core principles of SQL, that I would not change.
> > >
> > > Best, Fabian
> > >
> > > 2017-01-30 12:54 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
> > >
> > > > Hi Fabian,
> > > >
> > > > Thanks for the link and for the remarks.
> > > >
> > > > I do not imagine the behavior of the inner query necessary on the
> > > > lines you describe. I specifically refer to " is applies as well for
> > > > the inner query. However, as the result of the inner query evolves,
> > > > also the result of the join needs to be constantly recomputed.
> > > > Hence, for every new result of (SELECT x FROM input1 ORDER BY time
> > > > LIMIT 1), we would need to emit an update for each record that was
> > joined before."
> > > >
> > > > If we consider such a working scenario, then the behavior would be
> > > > something like the one below, if I understand correctly. Take for
> > > > example the query" STREAM amount, (SELECT id FROM  inputstream1) AS
> > > > field1 FROM inputstream2"
> > > >
> > > > Stream1 Stream2 Output
> > > >                 Id1
> > > > User1,10                        (10,Id1)
> > > > User2,11                        (11,Id2)
> > > >                 Id3             (10,Id3), (11, Id3)
> > > > User3,9                 (9,Id3)
> > > > ...
> > > >
> > > > ...regardless of how we express the logic of the inner query (do we
> > > > use LIMIT 1, we don't....), I would expect that the outputs that
> > > > were emitted are not retracted or modified in the future. In the
> > > > previous example the
> > > > updates:  (10,Id3), (11, Id3) should never happen. With this in mind
> > > > although the inner query is translated to a LogicalJoin operator,
> > > > the functionality is more similar with a union or a coFlatMap, where
> > > > we only use one input as the holder for what to associate in the
> > > > future for the other. Anyway, I do not see the need to have any
> > > > buffers (as for the general case of joins) to compute the content
> > > > for creating the output from the inner query.
> > > >
> > > > Regarding your previous comment about failing based on SINGLE_VALUE
> > > > verification: this is also something to be just agree. After all, as
> > > > the implementation is decoupled from the parsing of the query, we
> > > > can implement any of the behaviors: either through an error when a
> > > > second element or update would happen in the second stream, or just
> > > > update the single value state for future use.
> > > >
> > > > All in all, it think we just need to clarify the expectation to have.
> > > > Please let me know what do you think.
> > > >
> > > > I agree with the approach of starting small - even with some very
> > > > limited cases when we support inner queries and then extend or
> > > > define the general cases.
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Senior Research Engineer - Big Data Expert IT R&D Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: radu.tudoran@huawei.com
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> > > > Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf,
> > > > Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG,
> > > > Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf,
> > > > Amtsgericht Düsseldorf, HRB 56063,
> > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and
> > > > its attachments contain confidential information from HUAWEI, which
> > > > is intended only for the person or entity whose address is listed
> > > > above. Any use of the information contained herein in any way
> > > > (including, but not limited to, total or partial disclosure,
> > > reproduction,
> > > > or dissemination) by persons other than the intended recipient(s) is
> > > > prohibited. If you receive this e-mail in error, please notify the
> > > > sender by phone or email immediately and delete it!
> > > >
> > > > -----Original Message-----
> > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > > Sent: Monday, January 30, 2017 12:33 PM
> > > > To: dev@flink.apache.org
> > > > Subject: Re: STREAM SQL inner queries
> > > >
> > > > Hi Radu,
> > > >
> > > > I thought about your join proposal again and think there is an issue
> > > > with the semantics.
> > > >
> > > > The problem is that the result of a query is recomputed as new data
> > > > arrives in the dynamic table.
> > > > This applies as well for the inner query. However, as the result of
> > > > the inner query evolves, also the result of the join needs to be
> > > > constantly recomputed. Hence, for every new result of (SELECT x FROM
> > > > input1 ORDER BY time LIMIT 1), we would need to emit an update for
> > > > each record that was joined before.
> > > >
> > > > In order to prevent this, the join would need to have a time-based
> > > > join predicate defining that a tuple of the outer query should join
> > > > with the current value of the inner query that the time of its own
> > timestamp.
> > > Such a
> > > > predicate can be expressed in SQL but this is quite cumbersome.
> > > >
> > > > Julian Hyde (Apache Calcite committer) discussed similar use cases
> > > > in a document and proposed something called Temporal Tables [1].
> > > > In some sense, the proposed dynamic tables are a special case of
> > > > temporal table always reflecting the current point in time (i.e.,
> > > > not a previous point in time).
> > > >
> > > > Best, Fabian
> > > >
> > > > [1]
> > > > https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szpkbGqF
> > > > MBtzYiIY4dHe0Q
> > > >
> > > > 2017-01-27 21:13 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
> > > >
> > > > > Hi Radu,
> > > > >
> > > > > I was a bit surprised, that Calcite's parser accepted your query.
> > > > > Hence, I check the Calcite plan and and had look at the
> > > > > documentation of Calcite's SqlSingleValueAggFunction:
> > > > >
> > > > > > SINGLE_VALUE aggregate function returns the input value if there
> > > > > > is only
> > > > > one value in the input; Otherwise it triggers a run-time error.
> > > > >
> > > > > So, the query would only pass if the inner query returns a single
> > > > > value (which would usually not be the case of a stream).
> > > > > The SINGLE_VALUE check is not added to the plan if the inner query
> > > > > is guaranteed to return a single value (LIMIT 1, global
> aggregation).
> > > > >
> > > > > Anyway, I agree that we could start to add the simple cases of
> > > > > these joins for processing time.
> > > > > For event-time, I think we need to consider late arriving data and
> > > > > support retraction values.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > > 2017-01-27 10:43 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Thanks for the feedback!
> > > > >>
> > > > >> I agree that we should get the semantics right and after we can
> > > > >> implement it. I think it would be quite useful. Now, regarding
> > > > >> the
> > > > remarks you made:
> > > > >>
> > > > >>
> > > > >> "> SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1
> > > > >> FROM
> > > > >> inputstream2
> > > > >> you are suggesting that the subquery (SELECT id FROM
> > > > >> inputstream1) returns a single value (I assume the last received
> > > > >> value). I would interpret the query differently and expect it to
> > > > >> return the values of all rows of the inputstream1 up to the
> current
> > point in time.
> > > > >> "
> > > > >>
> > > > >> It is a good point. It is not so much that I wanted to suggest
> > > > >> that this should be the syntax to use - I just relied basically
> > > > >> on the logical operators that calcite has parsed the query into
> > > > >> (JOIN +
> > > SINGLE
> > > > VALUE).
> > > > >> Based on this logical translation I would say the correct
> > > > >> implementation for this translation is to return one value not
> > > > >> necessarily the whole content of the stream. Anyway, we are not
> > > > >> restricted to this as we could potentially use different rules in
> > > > calcite to alter the resulting plan.
> > > > >>
> > > > >> However, if we decide that such queries should return the whole
> > > > >> stream rather than a single value - we are indeed tapping in the
> > > > >> problem of potentially unbounded cases. For this I do agree that
> > > > >> the approach you proposed to rely on dynamic tables is very good.
> > > > >> In such a case we would just pass to the upper operators the
> > > > >> entire content of
> > > > the dynamic table.
> > > > >> For that matter it works also for the single value (as the table
> > > > >> would contain only one value). However, for the simple case of
> > > > >> returning a single value we can provide even now an
> > > > >> implementation and we do not need to wait until the full
> > > > >> functionality of dynamic
> > > > tables is provided.
> > > > >>
> > > > >> In the same time I also agree that the syntax  " a FROM
> > > > >> inputstream ORDER BY time LIMIT 1" is elegant. I have not issue
> > > > >> to consider the case of inner queries to be translated like this
> > > > >> only when they would
> > > > have the "Limit 1"
> > > > >> specified or directly only when they are provided in such a form.
> > > > >>
> > > > >> I will wait for additional remarks in order to all agree on a
> > > > >> specific semantic and then I will push this in a jira issue to be
> > > > >> furthered review and validated.
> > > > >>
> > > > >> Best regards,
> > > > >>
> > > > >>
> > > > >>
> > > > >> Dr. Radu Tudoran
> > > > >> Senior Research Engineer - Big Data Expert IT R&D Division
> > > > >>
> > > > >>
> > > > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center
> > > > >> Riesstrasse 25, 80992 München
> > > > >>
> > > > >> E-mail: radu.tudoran@huawei.com
> > > > >> Mobile: +49 15209084330
> > > > >> Telephone: +49 891588344173
> > > > >>
> > > > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> > > > >> Düsseldorf, Germany, www.huawei.com Registered Office:
> > > > >> Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing
> > > > >> Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der
> > > > >> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail
> > > > >> and its attachments contain confidential information from HUAWEI,
> > > > >> which is intended only for the person or entity whose address
> > > is
> > > > >> listed above. Any use of the information contained herein in any
> > > > >> way (including, but not limited to, total or partial disclosure,
> > > > reproduction,
> > > > >> or dissemination) by persons other than the intended recipient(s)
> > > > >> is prohibited. If you receive this e-mail in error, please notify
> > > > >> the
> > > > sender
> > > > >> by phone or email immediately and delete it!
> > > > >>
> > > > >> -----Original Message-----
> > > > >> From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > > >> Sent: Thursday, January 26, 2017 4:51 PM
> > > > >> To: dev@flink.apache.org
> > > > >> Subject: Re: STREAM SQL inner queries
> > > > >>
> > > > >> Hi everybody,
> > > > >>
> > > > >> thanks for the proposal Radu.
> > > > >> If I understood it correctly, you are proposing a left join
> > > > >> between a stream and a single value (which is compute from a
> > stream).
> > > > >> This makes sense and should be a common use case.
> > > > >>
> > > > >> However, I think some of your example queries do not return a
> > > > >> single value as required for the join.
> > > > >>
> > > > >> In your example:
> > > > >> > SELECT STREAM amount, (SELECT id FROM  inputstream1) AS field1
> > > > >> > FROM
> > > > >> inputstream2
> > > > >>
> > > > >> you are suggesting that the subquery (SELECT id FROM
> > > > >> inputstream1) returns a single value (I assume the last received
> > value).
> > > > >> I would interpret the query differently and expect it to return
> > > > >> the values of all rows of the inputstream1 up to the current
> > > > >> point in
> > > time.
> > > > >> IMO, a query like "SELECT a FROM inputstream ORDER BY time LIMIT
> 1"
> > > > would
> > > > >> capture the semantics better.
> > > > >>
> > > > >> The subquery
> > > > >> > (SELECT AVERAGE(amount) OVER (ORDER BY timestamp RANGE INTERVAL
> > > > >> > 1
> > > HOUR
> > > > >> PRECEDING) AS hour_sum FROM inputstream)
> > > > >>
> > > > >> has a similar problem and would return one row for each record of
> > > > >> inputstream, i.e., not a single value.
> > > > >>
> > > > >> Anyway, if we get the semantics of the query that computes the
> > > > >> single value right, I think this type of join should be well
> > > > >> covered by the dynamic table proposal.
> > > > >> The single value input will be a dynamic table (of constant size
> > > > >> = 1) which is continuously updated by the engine.
> > > > >> Joining this table to to a dynamic (append) table will result in
> > > > >> a continuously growing dynamic table, which can be emitted as a
> > stream.
> > > > >>
> > > > >> This would look very similar as you proposed but we would need to
> > > > >> make sure that the single value query actually returns a single
> > value.
> > > > >>
> > > > >> @Xingcan Thanks for your feedback.
> > > > >> I would suggest to move the general discussion about the dynamic
> > > > >> table proposal to the thread that Radu started (I responded there
> > > > >> a few
> > > > minutes
> > > > >> ago).
> > > > >>
> > > > >> Just a few comments here: By logically converting a stream into a
> > > > dynamic
> > > > >> table we have well defined semantics for the operation such as
> > > > aggregations
> > > > >> and joins.
> > > > >> However, you are right, that this does not mean that we can
> > > efficiently
> > > > >> apply all operations on dynamic tables that we can apply on an
> > > > >> actual
> > > > batch
> > > > >> table. Some operations are just too expensive or require too much
> > > state
> > > > to
> > > > >> be performed in a streaming fashion. So yes, there will be some
> > > > >> restrictions but that is rather to the nature of stream
> > > > >> processing
> > > than
> > > > to
> > > > >> the idea of dynamic tables, IMO.
> > > > >>
> > > > >> Best,
> > > > >> Fabian
> > > > >>
> > > > >>
> > > > >> 2017-01-26 11:33 GMT+01:00 Xingcan <xingcanc@gmail.com>:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > I've read the document about dynamic table. Honestly, I think
> > > > >> > it's well-defined and ingeniously compromise the batch and
> > > > >> > stream. There are two questions about the design.
> > > > >> >
> > > > >> > 1) Though it's fine to take the stream as a snapshot of a
> > > > >> > dynamic table, a table is essentially a set while a stream is
> > > > >> > essentially an ordered list (with xxTime). I'm not sure if the
> > > > >> > operations on a set will all suit for a list (e.g union or
> > > > >> > merge?). Of course, we can
> > > add
> > > > >> > an "order by time" to all SQL instances, but will it be
> suitable?
> > > > >> >
> > > > >> > 2) As radu said, I also think inner query is essential for a
> > > > >> > query language. (I didn't see any select from (select) in the
> > document).
> > > The
> > > > >> > problem is, the SQL is based on a closure theory while we can
> > > > >> > not prove that for a stream. Can the result from a stream
> > > > >> > operation be
> > > > >> another input?
> > > > >> > It depends. The window operator will convert "point of time"
> > > > >> > events
> > > to
> > > > >> > "period of time" events and I don't know if the nature of data
> > > > >> > have changed. Also, the partial emission will lead to
> > > > >> > heterogeneous
> > > > results.
> > > > >> >
> > > > >> > BTW, the "Emission of dynamic tables" section seem to be a
> > > > >> > little incompatible with the whole document...
> > > > >> >
> > > > >> > Best,
> > > > >> > Xingcan
> > > > >> >
> > > > >> > On Thu, Jan 26, 2017 at 6:13 PM, Radu Tudoran
> > > > >> > <radu.tudoran@huawei.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi Shaoxuan,
> > > > >> > >
> > > > >> > > Thanks for the feedback!
> > > > >> > > Regarding the proposal for relational queries that you
> > > referenced, I
> > > > >> > > am a bit confused with respect to its purpose and evolution
> > > > >> > > with respect to
> > > > >> > the
> > > > >> > > current implementation of stream sql - is it suppose to
> > > > >> > > replace
> > > this
> > > > >> > > implementation, to complement it....but I will send another
> > > > >> > > email about this as I guess this can be a standalone
> > > > >> > > discussion tread
> > > > >> > >
> > > > >> > > Also, regarding the join stream-to-stream I intend to start
> > > another
> > > > >> > > discussion about this such that we can decide all together if
> > > > >> > > we
> > > can
> > > > >> > start
> > > > >> > > some implementation/design now or we need to wait.
> > > > >> > >
> > > > >> > > Now, regarding the inner queries and the points you raised.
> > > > >> > > It is true that in general an inner join would work like any
> > > > >> > > other join (which obviously requires some buffering
> > > > >> > > capabilities and
> > > mechanisms
> > > > >> > > to restrict the infinite growth for the join state
> composition).
> > > > >> > > However, at least
> > > > >> > for
> > > > >> > > some cases of supporting inner queries we can support them
> > > > >> > > without the
> > > > >> > need
> > > > >> > > for buffering mechanisms or full support for inner join /
> > > > >> > > left
> > > join.
> > > > >> > > Basically the logical operator in which an inner query is
> > > translated
> > > > >> > (left
> > > > >> > > join with an always true condition is to some extend more
> > > > >> > > similar
> > > to
> > > > >> > UNION
> > > > >> > > ,- and the union implementation, then the implementation we
> > > > >> > > will have for the joins). This is why I believe we can
> > > > >> > > already provide the support for this (I also tested a PoC
> > > > >> > > implementation
> > > internally
> > > > >> > > for this and it
> > > > >> > works).
> > > > >> > > In terms of examples when we could use this, please see the
> > > > >> > > next 2 examples. Please let me know what do you think and
> > > > >> > > whether it is worth designing the jira issue perhaps with
> > > > >> > > some more details (including the technical details).
> > > > >> > >
> > > > >> > > Consider the example below:
> > > > >> > >
> > > > >> > > SELECT STREAM user
> > > > >> > >         FROM inputstream
> > > > >> > >         WHERE amount > (SELECT STREAM Min(amount2) FROM
> > > > >> > > inputstream2)
> > > > >> > >
> > > > >> > > The point of this is to restrict the values you are selecting
> > > based
> > > > >> > > on some value that you have from the other stream. Consider
> > > > >> > > the values below that come in each stream
> > > > >> > >
> > > > >> > > Inputstream             inputstream2            Result
> > > > >> > > User1,100                                       user1 (because
> > > there
> > > > >> is
> > > > >> > no
> > > > >> > > value in inputstream2 and the left join should not restrict
> > > > >> > > the output in this case)
> > > > >> > >                         X,x,10                  nothing as
> there
> > > is
> > > > no
> > > > >> > > event in inputstream to be outputted. Min will become from now
> > 10
> > > > >> > > User2, 20                                       user2 (because
> > 20
> > > is
> > > > >> > > greater than 10 which is the minimum retain in inputstream2)
> > > > >> > >                         X,x,20                  nothing as
> there
> > > is
> > > > no
> > > > >> > > event in inputstream to be outputted. Min will remain from now
> > 10
> > > > >> > >                         X,x, 5                  nothing as
> there
> > > is
> > > > no
> > > > >> > > event in inputstream to be outputted. Min will become from
> now 5
> > > > >> > > User3, 8                                        User3
> (because 8
> > > is
> > > > >> > > greater than 5)
> > > > >> > > ....
> > > > >> > >
> > > > >> > >
> > > > >> > > The goal for the final usage of this is to be able among
> > > > >> > > others to define multiple window processing on the same input
> > stream.
> > > > Consider:
> > > > >> > >
> > > > >> > > SELECT STREAM user
> > > > >> > >         FROM inputstream
> > > > >> > >         WHERE (SELECT STREAM AVERAGE(amount) OVER (ORDER BY
> > > > >> > > timestamp RANGE INTERVAL 1 HOUR PRECEDING) AS hour_sum FROM
> > > > >> > > inputstream) < amount
> > > > >> > >
> > > > >> > >
> > > > >> > > Assume you have the following events each coming every 30
> > minutes
> > > > >> > > User1, 100   -> Average is 100 and the output of the topology
> > that
> > > > >> > > implements the query is NULL (no output as 100 is not > than
> > 100)
> > > > >> > > User2, 10    -> Average is 55 and the output of the topology
> > that
> > > > >> > > implements the query is NULL (no output as 10 is not > than
> 55)
> > > > >> > > User3, 40    -> Average is 25 (10+40) and the output of the
> > > topology
> > > > >> that
> > > > >> > > implements the query is User3 (40 is > than 25) ....
> > > > >> > > Although the query as it is depends on aggregates and
> > > > >> > > windows, the operator to implement the inner query can be
> > > > >> > > implemented independently of functions that are contained in
> > > > >> > > the query. Also, there is no need for a window or buffering
> > > > >> > > to implement the logic for assembling the results
> > > > >> > from
> > > > >> > > the inner query.
> > > > >> > >
> > > > >> > >
> > > > >> > > Best regards,
> > > > >> > >
> > > > >> > > -----Original Message-----
> > > > >> > > From: Shaoxuan Wang [mailto:wshaoxuan@gmail.com]
> > > > >> > > Sent: Thursday, January 26, 2017 4:36 AM
> > > > >> > > To: dev@flink.apache.org
> > > > >> > > Subject: Re: STREAM SQL inner queries
> > > > >> > >
> > > > >> > >  Hi Radu,
> > > > >> > > Similar as the stream-stream join, this stream-stream inner
> > > > >> > > query does
> > > > >> > not
> > > > >> > > seem to be well defined. It needs provide at least some kind
> > > > >> > > of window bounds to complete the streaming SQL semantics. If
> > > > >> > > this is
> > > an
> > > > >> > > unbounded join/select, a mechanism of how to store the
> > > > >> > > infinite
> > > date
> > > > >> > > has to be considered. I may not fully understand your
> proposal.
> > > > >> > > Could you please provide more details about this inner query,
> > > > >> > > say giving some examples of input and output. It would be
> > > > >> > > also great
> > > if
> > > > >> > > you can explain the use case
> > > > >> > of
> > > > >> > > this inner query. This helps us to understand the semantics.
> > > > >> > >
> > > > >> > > It should also be noted that, we have recently decided to
> > > > >> > > unify stream
> > > > >> > and
> > > > >> > > batch query with the same regular (batch) SQL. Therefore we
> > > > >> > > have removed the support for STREAM keyword in flink
> > > > >> > > Streaming SQL. In the past
> > > > >> > several
> > > > >> > > months, Fabian and Xiaowei Jiang have started to work on the
> > > future
> > > > >> > > Relational Queries on flink streaming. Fabian has drafted a
> > > > >> > > very good design doc, https://goo.gl/m31kkE. The design is
> > > > >> > > based on a
> > > > new
> > > > >> > > concept of dynamic table whose content changes over time,
> > > > >> > > thereby can be derived from streams. With this dynamic table,
> > > > >> > > stream query can be done via
> > > > >> > regular
> > > > >> > > (batch) SQL. Besides some syntax sugar, there is not too much
> > > > >> > > difference between batch query and stream query (in terms of
> > > > >> > > what and where of a
> > > > >> > query
> > > > >> > > is executed). Stream query has addition characters in the
> > > > >> > > manners
> > > of
> > > > >> > > when to emit a result and how to refine the result
> > > > >> > > considering the
> > > > >> retraction.
> > > > >> > >
> > > > >> > > Hope this helps and look forward to working with you on
> > > > >> > > streaming
> > > > SQL.
> > > > >> > >
> > > > >> > > Regards,
> > > > >> > > Shaoxuan
> > > > >> > >
> > > > >> > >
> > > > >> > > On Wed, Jan 25, 2017 at 9:49 PM, Radu Tudoran
> > > > >> > > <radu.tudoran@huawei.com>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi all,
> > > > >> > > >
> > > > >> > > > I would like to open a jira issue (and then provide the
> > > > >> > > > implementation) for supporting inner queries. The idea is
> > > > >> > > > to be able to support SQL queries as the ones presented in
> > > > >> > > > the
> > > scenarios
> > > > >> below.
> > > > >> > > > The key idea is that supporting inner queries would require
> > > > >> > > > to have the
> > > > >> > > implementation for:
> > > > >> > > >
> > > > >> > > > è JOIN (type = left and condition = true) - Basically this
> > > > >> > > > is a simple implementation for a join function between 2
> > > > >> > > > streams that does not require any window support behind the
> > > > >> > > > scenes as there
> > > is
> > > > >> > > > no condition on which to perform the join
> > > > >> > > >
> > > > >> > > > è SINGLE_VALUE - this operator would require to provide one
> > > value
> > > > >> > > > to be furthered joined. In the context of streaming this
> > > > >> > > > value should basically evolve with the contents of the
> > > > >> > > > window. This could be implemented with a flatmap function
> > > > >> > > > as left joins would allow also to do the mapping with null
> > > > >> > > > values
> > > > >> > > >
> > > > >> > > > We can then extend this initial and simple implementation
> > > > >> > > > to provide support for joins in general (conditional joins,
> > > > >> > > > right
> > > > >> > > > joins..) or we can isolate this implementation for this
> > > > >> > > > specific case of inner queries and go with a totally new
> > > > >> > > > design for
> > > stream
> > > > >> > > > to stream joins (might be needed depending on what is the
> > > decision
> > > > >> > > > behind on how to support the conditional
> > > > >> > > > mapping)
> > > > >> > > >
> > > > >> > > > What do you think about this?
> > > > >> > > >
> > > > >> > > > Examples of scenarios to apply
> > > > >> > > >
> > > > >> > > > SELECT STREAM amount,
> > > > >> > > > (SELECT id FROM  inputstream1) AS field1 FROM inputstream2
> > > > >> > > >
> > > > >> > > > Translated to
> > > > >> > > > LogicalProject(amount=[$1], c=[$4])
> > > > >> > > >     LogicalJoin(condition=[true], joinType=[left])
> > > > >> > > >       LogicalTableScan(table=[[inputstream1]])
> > > > >> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > > > >> > > >         LogicalProject(user_id=[$0])
> > > > >> > > >           LogicalTableScan(table=[[inputstream2]])
> > > > >> > > >
> > > > >> > > > Or from the same stream - perhaps interesting for applying
> > > > >> > > > some more complex operations within the inner query SELECT
> > > > >> > > > STREAM amount, (SELECT id FROM  inputstream1) AS field1
> > > > >> > > > FROM
> > > inputstream1
> > > > >> > > >
> > > > >> > > > Translated to
> > > > >> > > > LogicalProject(amount=[$1], c=[$4])
> > > > >> > > >     LogicalJoin(condition=[true], joinType=[left])
> > > > >> > > >       LogicalTableScan(table=[[inputstream1]])
> > > > >> > > >       LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
> > > > >> > > >         LogicalProject(user_id=[$0])
> > > > >> > > >           LogicalTableScan(table=[[inputstream1]])
> > > > >> > > >
> > > > >> > > > Or used to do the projection SELECT STREAM amount, c  FROM
> > > > >> > > > (SELECT *,id AS c FROM
> > > inputstream1)
> > > > >> > > >
> > > > >> > > > Translated to
> > > > >> > > >   LogicalProject(amount=[$1], c=[$5])
> > > > >> > > >     LogicalProject(time=[$0], amount =[$1], date=[$2], id
> > > > >> > > > =[$4],
> > > > >> > c=[$5])
> > > > >> > > >       LogicalTableScan(table=[[inputstream1]])
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > Or in the future even
> > > > >> > > > SELECT STREAM amount, myagg FROM  (SELECT STREAM *,
> > > > >> > > > SUM(amount) OVER window AS myagg FROM inputstream1)) ...
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message