Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2F44A200C39 for ; Thu, 16 Mar 2017 16:39:48 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2DDAD160B7A; Thu, 16 Mar 2017 15:39:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4E311160B78 for ; Thu, 16 Mar 2017 16:39:47 +0100 (CET) Received: (qmail 29778 invoked by uid 500); 16 Mar 2017 15:39:46 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 29767 invoked by uid 99); 16 Mar 2017 15:39:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 15:39:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E7A1D18F151 for ; Thu, 16 Mar 2017 15:39:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id isxCQVB1Pvb2 for ; Thu, 16 Mar 2017 15:39:44 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id B81AA5FB79 for ; Thu, 16 Mar 2017 15:39:43 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 07B9BE05AE for ; Thu, 16 Mar 2017 15:39:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id BB521243A6 for ; Thu, 16 Mar 2017 15:39:41 +0000 (UTC) Date: Thu, 16 Mar 2017 15:39:41 +0000 (UTC) From: "radu (JIRA)" To: dev@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 16 Mar 2017 15:39:48 -0000 radu created FLINK-6082: --------------------------- Summary: Support window definition for SQL Queries based on WHERE clause with time condition Key: FLINK-6082 URL: https://issues.apache.org/jira/browse/FLINK-6082 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: radu Time target: Proc Time Calcite documentation refers to query examples where the (time) boundaries are defined as condition within the WHERE clause. As Flink community targets compatibility with Calcite, it makes sense to support the definition of windows via this method as well as corresponding aggregation on top of them. SQL targeted query examples: ---------------------------- ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ timestamp - INTERVAL '1' HOUR AND current\_timestamp``` General comment: 1) window boundaries are defined as conditions in WHERE clause. 2) For indicating the usage of different stream times, rowtime and proctime can be used 3) The boundaries are defined based on special construct provided by calcite: current\_timestamp and time operations Description: ------------ The logic of this operator is strictly related to supporting aggregates over sliding windows defined with OVER ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653), [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654), [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655), [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658), [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this issue the design considered queries where the window is defined with the syntax of OVER clause and aggregates are applied over this period. This is similar in behavior with the only exception that the window boundaries are defined with respect to the WHERE conditions. Besides this the logic and the types of aggregates to be supported should be the same (sum, count, avg, min, max). Supporting these types of query is related to the pie chart problem tackled by calcite. Similar as for the OVER windows, the construct should build rolling windows (i.e., windows that are triggered and move with every incoming event). Functionality example --------------------- We exemplify below the functionality of the IN/Exists when working with streams. `SELECT a, count(*) FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL '1' HOUR AND current_timestamp;` ||IngestionTime(Event)|| Stream1|| Output|| |10:00:01 |Id1,10 |Id1,1| |10:02:00 |Id2,2 |Id2,2| |11:25:00 |Id3,2 |Id3,1| |12:03:00 |Id4,15 |Id4,2| |12:05:00 |Id5,11 |Id5,3| |12:56:00 |Id6,20 |Id6,3| |...| Implementation option --------------------- Considering that the query follows the same functionality as for the aggregates over window, the implementation should follow the same implementation as for the OVER clause. Considering that the WHERE condition are typically related to timing, this means that in case of one unbound boundary the [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be used, while for bounded time windows the [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design should be used. The window boundaries will be extracted from the WHERE condition. The rule will not be mapped anymore to a LogicalWindow, which means that the conversion to this would need to happen from the current DataStreamCalc rule. In this sense, a dedicated condition will be added such that in case the WHERE clause has time conditions, the operator implementation of the Over clause (used in the previous issues) should be used. ``` class DataStreamCalcRule ----------------------------------------------------------------------------------------------- { --- ------------------------------------------------------------------------------------------- def convert(rel: RelNode): RelNode = { val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE) IF(WHERE contains TIME limits) { > IF(bounded) > > new DataStreamProcTimeTimeAggregate > > ELSE > > new DataStreamSlideEventTimeRowAgg > > } > Else **{** new DataStreamCalc( rel.getCluster, traitSet, convInput, rel.getRowType, calc.getProgram, description) } } } ----------------------------------------------------------------------------------------------- ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)