Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A2046200C56 for ; Thu, 30 Mar 2017 10:48:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9EE40160B8B; Thu, 30 Mar 2017 08:48:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C1CEE160B9A for ; Thu, 30 Mar 2017 10:48:47 +0200 (CEST) Received: (qmail 31594 invoked by uid 500); 30 Mar 2017 08:48:47 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 31575 invoked by uid 99); 30 Mar 2017 08:48:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 08:48:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 274A2C292D for ; Thu, 30 Mar 2017 08:48:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id F8l35rHtzugu for ; Thu, 30 Mar 2017 08:48:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 568175FC64 for ; Thu, 30 Mar 2017 08:48:44 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 00EF8E0C1B for ; Thu, 30 Mar 2017 08:48:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 268652417A for ; Thu, 30 Mar 2017 08:48:42 +0000 (UTC) Date: Thu, 30 Mar 2017 08:48:42 +0000 (UTC) From: "Fabian Hueske (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (FLINK-6081) Offset/Fetch support for SQL Streaming MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 30 Mar 2017 08:48:48 -0000 [ https://issues.apache.org/jira/browse/FLINK-6081?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6081: --------------------------------- Issue Type: New Feature (was: Bug) > Offset/Fetch support for SQL Streaming > -------------------------------------- > > Key: FLINK-6081 > URL: https://issues.apache.org/jira/browse/FLINK-6081 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: radu > Attachments: offset.png > > > Time target: Proc Time > The main scope of Offset/Fetch is for pagination support. In the context > of streaming Offset and Fetch would make sense within the scope of > certain window constructs as they refer to buffered data from the stream > (with a main usage to restrict the output that is shown at a certain > moment). Therefore they should be applied to the output of the types of > windows supported by the ORDER BY clauses. Moreover, in accordance to > the SQL best practices, they can only be used with an ORDER BY clause. > SQL targeted query examples: > ---------------------------- > Window defined based on group by clause > ```Q1: SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(procti= me, INTERVAL '1' HOUR, INTERVAL '3' HOUR) ``` > Window defined based on where clause time boundaries > ```Q2: SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN current\_tim= estamp - INTERVAL '1' HOUR AND current\_timestamp FROM stream1 ``` > ~~Window defined as sliding windows (aggregates) ~~ > ``` Q3: ~~SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR P= RECEDING b OFFSET n ROWS) FROM stream1~~ ``` > Comment: Supporting offset over sliding windows (within the window) does > not make sense because the main scope of OFFSET/FETCH is for pagination > support. Therefore this functionality example should only be supported in= relation to the > output of a query. Hence, Q3 will not be supported > The general grammar (Calcite version) for OFFSET/FECTH with available > parameters is shown below: > ``` > Select [=E2=80=A6] > [ ORDER BY orderItem [, orderItem ]* ] > [ OFFSET start { ROW | ROWS } ] > [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ] > ``` > Description > ----------- > Offset and Fetch are primary used for pagination support (i.e., restrict > the output that is shown at some point). They were mainly designed to > support web page display of the contents. Building on this scenario we > can imagine a similar role for OFFSET and FETCH for streams that would > display contents via a web page. In such a scenario the number of > outputs to be displayed would be limited using such operators (probably > for pagination and aesthetic reasons). However, as for any stream > application there is a natural evolution in time, the operators output > should evolve with the update rate of the application. The fact that > there is an update rate and a collection of events related to a stream > points to window constructs. Therefore the OFFSET/FETCH functionality > would be related to the window mechanisms/boundaries defined by the > query. Hence when the window construct would be re-triggered the output > would be filtered again from the cardinality point of view based on the > logic of the OFFSET/FETCH. > Because of the primary reasons of supporting pagination (and controlling > the number of outputs) we limit the usage of OFFSET/Fetch for window > constructs that would be related to the output. Because of this > supporting those on sliding window with query aggregates (e.g., Q3 query > example) would not make sense. Additionally there is an implicit need > for some ordering clause due to the fact that OFFSET and FETCH point to > ordering positions. That is why these functions would be supported only > if an ORDER BY clause is present. > Functionality example > --------------------- > We exemplify the usage of OFFSET below using the following query. Event > schema is in the form (a,b). > ``` SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY CEIL= (proctime TO HOUR) ``` > ||Proctime||=09IngestionTime(Event)||=09Stream1||=09Output|| > | |10:00:01|=09(a1, 7)| |=09 > | |10:05:00|=09(c1, 2)| |=09 > | |10:12:00|=09(b1,5)| |=09 > | |10:50:00|=09(d1,2)| |=09 > |10-11|=09=09|=09|b1,a1| > | |11:03:00|=09(a2,10)|=09|=20 > |11-12|=09=09|=09|nil| > |...| > Implementation option > --------------------- > There are 2 options to implement the logic of OFFSET/Fetch: > 1) Within the logic of the window (i.e. sorting window) > Similar as for sorting support (ORDER BY clause), considering that the > SQL operators will be associated with window boundaries, the > functionality will be implemented within the logic of the window as > follows. We extract the window boundaries and window type from the query > logic. These will be used to define the type of the window, triggering > policy. The logic of the query (i.e., the sorting of the events) will in > turn be implemented within the window function. In addition to this, the > logic of for filtering the output based on the cardinality logic of > OFFSET/FETCH will be added. With this implementation the logic of the > OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER > BY is always required, it does not provide any implementation > restrictions. > 1) Within the logic of a filter/flatMap function with state counter for > outputs) > Instead of adding the logic within the window functions, the filtering > can be done within a standalone operator that only counts outputs and > emits the ones that fall within the logic of the OFFSET/FETCH. To > provide this functionality we need to use a flatMap function in which we > count the results. The OFFSET/FETCH condition would be transpose into > the condition of an IF, applied based on the order of the output, to > emit the output. However, the counter would need to be reset in > accordance to the triggering of the window, which makes the > implementation tedious. This is despite the fact that this > implementation option would directly translate the output filtering > logic of the operators from relational SQL. > We recommend option 1 for implementation. > Therefore for option 1 we reuse entirely the ORDER BY implementation and > just add: > 1) A counter for the indexing the outputs > 2) An if condition to emit the output only if the corresponding index > counter falls within the scope defined by the OFFSET/FETCH > !offset.png! > General logic of Join > --------------------- > inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window()) > > //.trigger(new \[Time/Count\]Trigger()) =E2=80=93 use default > > > > //.evictor(new \[Time/Count\]Evictor()) =E2=80=93 use default > .apply(SortAndCountFilter()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)