flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "radu (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-6249) Distinct Aggregates for OVER window
Date Mon, 03 Apr 2017 14:52:41 GMT
radu created FLINK-6249:
---------------------------

             Summary: Distinct Aggregates for OVER window
                 Key: FLINK-6249
                 URL: https://issues.apache.org/jira/browse/FLINK-6249
             Project: Flink
          Issue Type: New Feature
          Components: Table API & SQL
    Affects Versions: 1.3.0
            Reporter: radu


Time target: ProcTime/EventTime

SQL targeted query examples:
----------------------------

Q1. Boundaries are expressed in windows and meant for the elements to be aggregated

Q1.1. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT
ROW) FROM stream1`

Q1.2. `SELECT SUM( DISTINCT  b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR
PRECEDING AND CURRENT ROW) FROM stream1`

Q1.3. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT
ROW) FROM stream1`

Q1.4. `SELECT SUM( DISTINCT  b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING
AND CURRENT ROW) FROM stream1`

General comments:

-   DISTINCT operation makes sense only within the context of windows or
    some bounded defined structures. Otherwise the operation would keep
    an infinite amount of data to ensure uniqueness and would not
    trigger for certain functions (e.g. aggregates)

-   We can consider as a sub-JIRA issue the implementation of DISTINCT
    for UNBOUND sliding windows. However, there would be no control      over the data structure
to keep seen data (to check it is not re-process). ->   This needs to be decided if we
want to support it (to create appropriate JIRA issues)
=> We will open sub-JIRA issues to extend the current functionality of aggregates for the
DISTINCT CASE (Q1.{1-4}).  (This is the main target of this JIRA)

=>   Aggregations over distinct elements without any boundary (i.e.
    within SELECT clause) do not make sense just as aggregations do not
    make sense without groupings or windows.

Other similar query support
------------

Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for the elements of
the aggregate(s)

`SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() TO HOUR)`

=> We need to decide if we aim to support for this release distinct aggregates for the
group by (Q2). If so sub-JIRA issues need to be created. We can follow the same design/implementation.


=> We can consider as a sub-JIRA issue the implementation of DISTINCT
    for select clauses. However, there is no control over the growing
    size of the data structure and it will unavoidably crash the memory.
Q3. Distinct is applied to the collection of outputs to be selected.

`SELECT STREAM DISTINCT procTime(), prodId  FROM stream1 GROUP BY FLOOR(procTime() TO DAY)`

Description:
------------

The DISTINCT operator requires processing the elements to ensure
uniqueness. Either that the operation is used for SELECT ALL distinct
elements or for applying typical aggregation functions over a set of
elements, there is a prior need of forming a collection of elements.
This brings the need of using windows or grouping methods. Therefore the distinct function
will be implemented within windows. Depending on the
type of window definition there are several options:
-   Main Scope: If distinct is applied as in Q1 example for window aggregations than either
we extend the implementation with distinct aggregates (less prefered) or extend the sliding
window aggregates implementation in the processFunction with distinctinction identification
support (prefered). The later option is prefered because a query can carry multiple aggregates
including multiple aggregates that have the distinct key word set up. Implementing the distinction
between elements in the process function avoid the need to multiply the data structure to
mark what what was seen across multiple aggregates. It also makes the implementation more
robust and resilient as we cn keep the data structure for marking the seen elements in a state
(mapstate).
-   If distinct is applied as in Q2 example on group elements than
    either we define a new implementation if selection is general or
    extend the current implementation of grouped aggregates with
    distinct group aggregates
-   If distinct is applied as in Q3 example for the select all elements,
    then a new implementation needs to be defined. This would work over
    a specific window and within the window function the uniqueness of
    the results to be processed will be done.


Functionality example
---------------------

We exemplify below the functionality of the IN/Exists when working with
streams.

`Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `

`Q2: SELECT  COUNT(DISTINCT  b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) `

`Q3:  SELECT  sum(DISTINCT  a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT
ROW) FROM stream1`



<style type="text/css">
</style>
<table class="tg">
  <tr>
    <th class="tg-9hbo">Proctime</th>
    <th class="tg-9hbo">IngestionTime(Event)</th>
    <th class="tg-9hbo">Stream1</th>
    <th class="tg-9hbo">Q1</th>
    <th class="tg-9hbo">Q2</th>
    <th class="tg-9hbo">Q3</th>
  </tr>
  <tr>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">10:00:01</td>
    <td class="tg-yw4l">(ab, 1)</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">1</td>
  </tr>
  <tr>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">10:05:00</td>
    <td class="tg-yw4l">(aa, 2)</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">3</td>
  </tr>
  <tr>
    <td class="tg-yw4l">10-11</td>
    <td class="tg-yw4l">11:00:00</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">ab,aa</td>
    <td class="tg-yw4l">2</td>
    <td class="tg-yw4l"></td>
  </tr>
  <tr>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">11:03:00</td>
    <td class="tg-yw4l">(aa,2)</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">3</td>
  </tr>
  <tr>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">11:09:00</td>
    <td class="tg-yw4l">(aa,2)</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">2</td>
  </tr>
  <tr>
    <td class="tg-9hbo">11-12</td>
    <td class="tg-yw4l">12:00:00</td>
    <td class="tg-yw4l"></td>
    <td class="tg-yw4l">aa</td>
    <td class="tg-yw4l">1</td>
    <td class="tg-yw4l"></td>
  </tr>
  <tr>
    <td class="tg-9hbo" colspan="6">...</td>
  </tr>
</table>



Implementation option
---------------------

Considering that the behavior depends on over window behavior, the
implementation will be done by reusing the existing implementation of the over window functions
- done based on processFunction. As mentioned in the description section, there are 2 options
to consider:

1)  Using distinct within the aggregates implementation by extending with distinct aggregates
implementation the current aggregates in Flink. For this we define additional JIRA issues
for each implementation to support the distinct keyword.

2)  Using distinct for selection within the process logic when calling the aggregates. This
requires a new implementation of the process Function used for computing the aggregates. The
processFunction will also carry the logic of taking each element once. For this  2 options
are possible. Option 1 (To be used within the ProcessFunction) trades memory – and would
require  to create a hashmap (e.g. mapstate) with binary values to mark if the event was saw
 before. This will be created once per window and will be reused across multiple distinct
aggregates. Option 2 trades computation and would require to sort the window contents and
in case of identical elements to eliminate them. The sorting can be done based on hash values
in case the events are  non numeric or composite or do not possess an id to mark the uniqueness.
 Option 2 is not prefered for incremental aggregates and should be consider only if certain
aggregates would require a window implementation that recomputes everything from scratch.






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message