flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [DISCUSS] Defining the Semantics of StreamingSQL
Date Thu, 20 Oct 2016 12:55:12 GMT
Hi everybody,

I cross posted the proposal also to the Apache Calcite dev mailing list to
collect some feedback from the community.
Tyler Akidau (Apache Beam committer) responded and commented on the

I am moving our conversion from Google Doc comments to the mailing list
with Tyler's consent to continue here.

Tyler commented on this sentence:

> "Result refinement does not affect the semantics of a query and should
therefore not be part of the query. Instead it is a property of the query


I don't think I agree with this statement. For streams, the nature of
refinement is critical to defining their shape.
And for tables, as soon as you provide a parameter to configure when late
data are dropped, you've affected the semantics of the query.
As such, I would argue refinement is essential to defining query semantics
any time streams are involved, and thus a reasonable justification for
syntax extension, e.g. the EMIT WHEN proposal: https://docs.google.com/do

That said, extracting refinement semantics outside of the query seems like
a fair compromise for the case where you're trying to support streaming
within existing standard SQL syntax.


Yes, I know. We discussed this issue when drafting this proposal.
When the query starts to drop late data, the computed result will only be
an approximation of the defined query result. That should be clearly
pointed out.

Something that just came to my mind: shouldn't the watermark generation
mode also be part of the query definition?
Given the same query and same refinement and state cleanup configuration,
different watermark generation modes could lead to different results
because it essentially defines when data is late, right?


Re watermark generation: that's a very good question. Watermarks are going
to be a characteristic of each table/stream, but I think there are two
places where the user might have a voice in what the watermarks look like:

1. Sources: Watermarks are typically established at ingress time into the
It's very reasonable that you might want to observe a given table/stream
with different watermark strategies (e.g. a 100th %ile watermark when you
want accuracy at the cost of latency, & a 99th %ile watermark when you want
better latency at the cost of correctness).
But it's not clear to me that this choice should be part of the query,
aside from choosing the appropriate table/stream to query. Not all sources
are going to support all watermark modes.
So I'd tend to think it would make more sense for the watermark mode to be
a property of the source, defined outside of the query (as other source
metadata are, e.g. HBase table name for an HBase connector).

2. Grouping: Any time a grouping operation is performed (i.e. any time a
table is created from a stream, if using the semantics proposed in my doc
above), you can define how the grouping operation affects watermark
progress. In general, when grouping within a window, the range of valid
event times is [min timestamp of non-late data in the pane, ∞). In Beam, I
believe we're going to provide the choice of { MIN_NON_LATE_TIMESTAP,
MAX_NON_LATE_TIMESTAMP, END_OF_WINDOW }, since those are the most relevant
ones (& you can always safely move the timestamp beyond the end of the
window later on). This choice I do believe should be a part of query, as it
directly relates to the way conversion from a stream to a table is
happening, which will affect the shape of any stream derived from that
table later on.

I think the existing (largely implicit) proposal here is actually that the
event time for a given window during grouping operations is defined by the
user when they specify the value of the rowtime column in the result. I
think the optimizer can figure out from that expression whether or not
rowtime will evaluate to something >=
It might require a special aggregator that knows how to ignore late
records. But being able to have the timestamp of an aggregate defined via
the user-provided rowtime value would probably be preferable to a syntax



For Flink's Stream SQL our plans were to require the input stream to
provide watermarks, i.e. treat watermarks as a property of the source, just
as you described.
It definitely makes sense to have the same stream with different watermark
accuracy. In addition, the "Complete Result Offset" (as proposed in our
document) can also be used to tune the result accuracy by allowing to defer
the evaluation of a window. This would be similar to having watermarks with
higher accuracy.

For the timestamp assignment of window operations, our plan so far was to
use the semantics of Flink's DataStream API which is to assign
END_OF_WINDOW. The timestamp would be addressable in a system column
"rowtime", which cannot be modified to ensure that users do not mess-up the
But you are right, we could allow to make the timestamp of window results
configurable, by explicitly defining a "rowtime" attribute in the output
just as in the examples of the document you shared:

*SELECT SUM(score), team, MIN(rowtime) AS rowtime // new rowtime is minimum
rowtime of grouped recordsFROM StreamGROUP BY TUMBLE(rowtime, INTERVAL '1'
HOUR), team*

That would of course mean that we cannot build upon the DataStream API
windows and need to implement our own window operators (which might be
inevitable in any case).
I think the way to go is to start with the implicit END_OF_WINDOW and add
the possibility to configure timestamps later.

Best, Fabian

2016-10-18 16:43 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi everybody,
> at Flink Forward we had a BOF session about StreamSQL.
> After the conference, some folks and I sat down and drafted a proposal for
> Flink's StreamSQL semantics.
> --> https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQ
> PW4tnl8THw6rzGUdaqU
> The proposal includes:
> - A definition for dynamic tables
> - How to convert streams into dynamic tables
> - How to convert dynamic tables into streams
> - How to query dynamic tables
> - Which types of queries to support
> - How to specify early results, update rate, and late refinements
> - How to control the size of the query state
> - How to write query results to external systems (Kafka, files, Cassandra,
> HBase, ...)
> - How to make a query result accessible via Flink's queryable kv-state
> - A few examples how StreamSQL queries can be defined
> The proposal does not include a workplan or task breakdown yet.
> This is something I'll work on in the next days.
> Please share your thoughts and opinions about the proposal on the mailing
> list.
> Thanks,
> Fabian

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message