samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <jh...@apache.org>
Subject Re: Hopping and tumbling windows in streaming SQL
Date Thu, 25 Jun 2015 20:44:17 GMT
Glad you like it. I've filled out the spec with some more examples, see below.

Here are the proposed functions:

* HOP(t, emit, retain)
* HOP(t, emit, retain, align)
* TUMBLE(t, emit)

TUMBLE(t, e) is equivalent to HOP(t, e, e).

HOP(t, e, r) is equivalent to HOP(t, e, r, TIME '00:00:00').

Q1. One hour tumbling window:

SELECT STREAM START(rowtime),
  COUNT(*)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR)

Emits a row at 01:00 containing rows in [00:00, 01:00);
emits a row at 02:00 containing rows in [01:00, 02:00), etc.

Q2. Same as Q1, expressed using HOP:

SELECT STREAM START(rowtime),
  COUNT(*)
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' HOUR)

Q3. Hopping window

SELECT STREAM START(rowtime),
  COUNT(*)
FROM Orders
GROUP BY HOP(rowtime,
    INTERVAL '30' MINUTE,
    INTERVAL '1:45' HOUR TO MINUTE)

Emits a row at 01:00 containing rows in [23:15, 01:00);
emits a row at 01:30 containing rows in [23:45, 01:30), etc.

Q4. Aligned tumbling window

SELECT STREAM START(rowtime),
  COUNT(*)
FROM Orders
GROUP BY HOP(rowtime,
    INTERVAL '1:30' HOUR TO MINUTE,
    INTERVAL '2' HOUR, TIME '0:30')

Emits a row at 00:30 containing rows in [22:30, 00:30);
emits a row at 02:00 containing rows in [00:00, 02:00), etc.

Q5. Aligned tumbling window

TUMBLE does not have an align argument, so you need to use HOP.

SELECT STREAM START(rowtime),
  COUNT(*)
FROM Orders
GROUP BY HOP(rowtime,
    INTERVAL '1' HOUR,
    INTERVAL '1' HOUR,
    TIME '0:30')

Emits a row at 00:30 containing rows in [23:30, 00:30);
emits a row at 01:30 containing rows in [00:30, 01:30), etc.

Q6. Decaying average

SELECT STREAM END(rowtime),
  productId,
  SUM(unitPrice * EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR))
   / SUM(EXP((rowtime - START(rowtime)) SECOND / INTERVAL '1' HOUR))
FROM Orders
GROUP BY HOP(rowtime,
    INTERVAL '1' SECOND,
    INTERVAL '1' HOUR),
  productId

Emits a row at 00:00:00 containing rows in [23:00:00, 00:00:00);
emits a row at 00:00:01 containing rows in [23:00:01, 00:00:01).

The expression weighs recent orders more heavily than older orders.
Extending the window from 1 hour to 2 hours or 1 year would have
virtually no effect on the accuracy of the result (but use more memory
and compute).

Note that we use START inside an aggregate function (SUM) because it
is a value that is constant for all rows within a sub-total. This
would not be allowed for typical aggregate functions (SUM, COUNT
etc.). START and END behave more like the GROUPING() function in this
regard.

Q7. Non-streaming query

HOP and TUMBLE were devised for a use case that occurs in streaming
SQL, but they can be used in non-streaming queries. For example,

SELECT START(rowtime),
  COUNT(*)
FROM Orders
WHERE rowtime BETWEEN '2015-01-01 00:00:00'
    AND '2015-01-18 00:00:00'
GROUP BY TUMBLE(rowtime,
    INTERVAL '2' HOUR)

This is the same as Q1, but omits the STREAM keyword, so it means
query the table containing historical orders.

Q8. Grouping sets

It should be possible to mix HOP and TUMBLE in with GROUPING SETS but
I haven't devised an example.

While we're on the subject of GROUPING SETS, I should state for the record:
* GROUPING SETS is valid for a streaming query provided that every
grouping set contains a monotonic expression.
* CUBE and ROLLUP are not valid for streaming query, because they will
produce at least one grouping set that aggregates everything (like
"GROUP BY ()").

Maybe we should allow CUBE and ROLLUP with an understanding that some
levels of aggregation will never complete (because they have no
monotonic expressions) and thus will never be emitted.

Julian

On Thu, Jun 25, 2015 at 7:32 AM, Milinda Pathirage
<mpathira@umail.iu.edu> wrote:
> Hi Julian,
>
> This is a great improvement over the previous hopping window. Thanks for
> thinking through this. I also like if we can introduce a TUMBLE function
> with more control over how we define tumbling window size. With the current
> FLOOR based model we have to perform date/time arithmetic to have tumbling
> windows such as 5 minutes tumbling windows (May be there is a better way
> that I don't know). But TUMBLE function that can specify the parameters
> such as window size would be nice. I am +1 for other extensions as well.
>
> Thanks
> Milinda
>
> On Wed, Jun 24, 2015 at 6:32 PM, Julian Hyde <jhyde@apache.org> wrote:
>
>> Hi all,
>>
>> Forgive the cross-post. This is for Calcite devs interested in
>> streaming and Samza devs interested in SQL.
>>
>> I've been thinking some more about how to implement hopping and
>> tumbling windows in streaming SQL. I was previously at a loss to find
>> a concise syntax that is consistent with how SQL semantics, but I have
>> found a syntax that I think can please everyone.
>>
>> Recall that a hopping window emits a sub-total every X seconds of
>> records that have arrived over the last Y seconds. A tumbling window
>> is a hopping window where X and Y are equal.
>>
>> In https://calcite.incubator.apache.org/docs/stream.html#hopping-windows
>> I give an example, "emit, every hour, the number of each product
>> ordered over the past three hours".
>>
>> That example gives a query in terms of a GROUP BY (in the HourlyTotals
>> view) followed by a moving sum. I didn't think that it was possible to
>> express using just one GROUP BY, because that would violate one of the
>> principles of SQL: that each record entering a GROUP BY contributes to
>> precisely one output record.
>>
>> But I've just realized that the CUBE, ROLLUP and GROUPING SETS
>> operators (already in SQL) violate that principle. And if they can do
>> it, we can do the same. So we could add another grouping function,
>> HOP(t, emit, retain).
>>
>> The query would look like this:
>>
>> SELECT STREAM START(rowtime) AS rowtime,
>>   productId,
>>   SUM(units) AS sumUnits,
>>   COUNT(*) AS c
>> FROM Orders
>> GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR),
>>   productId
>>
>> Much nicer than the one in stream.html!
>>
>> The "trick" is that the HOP function is returning a list of rowtime
>> values. For example, for row 1 {rowtime: '09:33', ...} it will return
>> ['09:00', '10:00', '11:00']; for row 2 {rowtime: '10:05', ...} it will
>> return ['10:00', '11:00', '12:00']. The system adds each row to
>> several sub-totals, and emits each sub-total when it is complete. The
>> sub-total for '09:00' will contain only row 1, and will be emitted at
>> '10:00'; the sub-total for '10:00' will contain row 1 and row 2, and
>> will be emitted at '11:00', and so forth.
>>
>> Returning multiple values is related to the flatMap function in Spark
>> (and earlier selectMany in LINQ) and makes HOP's semantics similar to
>> GROUPING SETS and therefore sound.
>>
>> START is a new aggregate function that returns the lower bound of the
>> current sub-total; END similarly.
>>
>> Note that the "retain" argument does not need to be a whole multiple
>> of the "emit" argument. This was a major limitation in the previous
>> proposal.
>>
>> There are some straightforward extensions:
>> * Define a TUMBLE function;
>> * Add an "align" argument to HOP, to allow windows to start at, say, 5
>> minutes past each hour;
>> * Apply HOP to windows based on row-counts;
>> * Allow user-defined windowing functions that similarly return a list
>> of interval start-end points.
>>
>> Julian
>>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org

Mime
View raw message