Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3DC0F200C55 for ; Thu, 30 Mar 2017 03:24:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 381AD160B95; Thu, 30 Mar 2017 01:24:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7B7A7160B8A for ; Thu, 30 Mar 2017 03:24:23 +0200 (CEST) Received: (qmail 58325 invoked by uid 500); 30 Mar 2017 01:24:22 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 58316 invoked by uid 99); 30 Mar 2017 01:24:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 01:24:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 34D45182398 for ; Thu, 30 Mar 2017 01:24:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id fmu26ZUvaRkZ for ; Thu, 30 Mar 2017 01:24:21 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3F7345FCD9 for ; Thu, 30 Mar 2017 01:24:20 +0000 (UTC) Received: (qmail 58280 invoked by uid 99); 30 Mar 2017 01:24:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 01:24:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1A8D3DFBDA; Thu, 30 Mar 2017 01:24:19 +0000 (UTC) From: sunjincheng121 To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #3641: [FLINK-5654] - Add processing time OVER RANGE BETW... Content-Type: text/plain Message-Id: <20170330012419.1A8D3DFBDA@git1-us-west.apache.org> Date: Thu, 30 Mar 2017 01:24:19 +0000 (UTC) archived-at: Thu, 30 Mar 2017 01:24:24 -0000 Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108823957 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +713,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + tEnv.registerTable("MyTable", t) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + + tEnv.registerTable("MyTable", t) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { --- End diff -- `Aggregatation` -> `Aggregation` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---