Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 71C39200C61 for ; Mon, 20 Mar 2017 21:15:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 708DF160B71; Mon, 20 Mar 2017 20:15:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2808160B96 for ; Mon, 20 Mar 2017 21:15:48 +0100 (CET) Received: (qmail 91889 invoked by uid 500); 20 Mar 2017 20:15:47 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 91740 invoked by uid 99); 20 Mar 2017 20:15:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Mar 2017 20:15:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6B0E2C0A4B for ; Mon, 20 Mar 2017 20:15:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.451 X-Spam-Level: * X-Spam-Status: No, score=1.451 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id joR2o9UO9V7v for ; Mon, 20 Mar 2017 20:15:46 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 4506361F00 for ; Mon, 20 Mar 2017 20:15:46 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 2E27AE0D1F for ; Mon, 20 Mar 2017 20:15:44 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 05E0E25507 for ; Mon, 20 Mar 2017 20:15:42 +0000 (UTC) Date: Mon, 20 Mar 2017 20:15:42 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 20 Mar 2017 20:15:49 -0000 [ https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15933453#comment-15933453 ] ASF GitHub Bot commented on FLINK-5653: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r106963715 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -150,12 +160,71 @@ class DataStreamOverAggregate( inputType, false) - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataStream[Row]] - } + inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + result + } + + def createBoundedAndCurrentRowProcessingTimeOverWindow( + inputDS: DataStream[Row]): DataStream[Row] = { + + val overWindow: Group = logicWindow.groups.get(0) + val partitionKeys: Array[Int] = overWindow.keys.toArray + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + + // get the output types + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + // window size is lowerbound +1 to comply with over semantics + val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + 1 + + val (aggFunction, accumulatorRowType, aggResultRowType) = --- End diff -- `aggResultRowType` and `rowTypeInfo` should be the same. Please add a safety check for this. > Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL > -------------------------------------------------------------------- > > Key: FLINK-5653 > URL: https://issues.apache.org/jira/browse/FLINK-5653 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Stefano Bortoli > > The goal of this issue is to add support for OVER ROWS aggregations on processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5656) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)