samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <>
Subject Re: Triggering emits for streaming window aggregates
Date Fri, 26 Jun 2015 18:30:02 GMT
Hi, Milinda,

I thought that in your example, the ordering field is given in GROUP BY.
Are we missing a way to pass the ordering field(s) to the LogicalAggregate?


On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <>

> Hi Julian,
> Even though this is a general question across all the streaming aggregates
> which utilize GROUP BY clause and a monotonic timestamp field for
> specifying the window, but I am going to stick to most basic example (which
> is from Calcite Streaming document).
> SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
>   productId,
>   COUNT(*) AS c,
>   SUM(units) AS units
> FROM Orders
> GROUP BY FLOOR(rowtime TO HOUR), productId;
> I was trying to implement an aggregate operator which handles tumbling
> windows via the monotonic field in GROUP By clause in addition to the
> general aggregations. I went in this path because I thought integrating
> windowing aspects (at least for tumbling and hopping) into aggregate
> operator will be easier than trying to extract the window spec from the
> query plan for a query like above. But I hit a wall when trying to figure
> out trigger condition for emitting aggregate results. I was initially
> planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
> aggregate results for previous groups (I was thinking to keep old groups
> around until we clean them up after a timeout). But when trying to
> implement this I figured out that I don’t know how to check which GROUP BY
> field is monotonic so that I only detect new values for the monotonic
> field/fields, not for the all the other fields. I think this is not a
> problem for tables because we have the whole input before computation and
> we wait till we are done with the input before emitting the results.
> With regards to above can you please clarify following things:
> - Is the method I described above for handling streaming aggregates make
> sense at all?
> - Is there a way that I can figure out which fields/expressions in
> LogicalAggregate are monotonic?
> - Or can we write a rule to annotate or add extra metadata to
> LogicalAggregate so that we can get monotonic fields in the GROUP By clause
> Thanks in advance
> Milinda
> --
> Milinda Pathirage
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
> twitter: milindalakmal
> skype: milinda.pathirage
> blog:

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message