Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5F120200C5E for ; Sat, 22 Apr 2017 22:43:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5DAF6160BA2; Sat, 22 Apr 2017 20:43:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DD93160B91 for ; Sat, 22 Apr 2017 22:43:49 +0200 (CEST) Received: (qmail 94084 invoked by uid 500); 22 Apr 2017 20:43:48 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 94075 invoked by uid 99); 22 Apr 2017 20:43:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Apr 2017 20:43:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 51A471A033C for ; Sat, 22 Apr 2017 20:43:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id kQr3fxI_e2D0 for ; Sat, 22 Apr 2017 20:43:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id E0DE75F20E for ; Sat, 22 Apr 2017 20:43:44 +0000 (UTC) Received: (qmail 94070 invoked by uid 99); 22 Apr 2017 20:43:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Apr 2017 20:43:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 10C39E381C; Sat, 22 Apr 2017 20:43:44 +0000 (UTC) From: hequn8128 To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract... Content-Type: text/plain Message-Id: <20170422204344.10C39E381C@git1-us-west.apache.org> Date: Sat, 22 Apr 2017 20:43:44 +0000 (UTC) archived-at: Sat, 22 Apr 2017 20:43:50 -0000 Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112817910 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( + ("Hello", 1), + ("word", 1), + ("Hello", 1), + ("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1", "2", "1", "3", "4") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + over agg(unbounded, procTime, keyed) + @Test + def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT word, cnt, count(word) " + + "OVER (PARTITION BY cnt ORDER BY ProcTime() " + + "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM " + + "(SELECT word, count(number) as cnt from T1 group by word) " + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,1", "bark,1,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + over agg(unbounded, procTime, non-keyed) + @Test + def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT word, cnt, count(word) " + + "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM (SELECT word , count(number) as cnt from T1 group by word) " + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,2", "bark,1,3") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // test unique process, if the current output message of unbounded groupby equals the + // previous message, unbounded groupby will ignore the current one. + @Test + def testUniqueProcess(): Unit = { + // data input + val data = List( + (1234, 2L), + (1234, 0L) + ) + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'pk, 'value) + val resultTable = table + .groupBy('pk) + .select('pk as 'pk, 'value.sum as 'sum) + .groupBy('sum) + .select('sum, 'pk.count as 'count) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("2,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // correlate should handle retraction messages correctly + @Test + def testCorrelate(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val func0 = new TableFunc0 + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .leftOuterJoin(func0('word)) + .groupBy('count) + .select('count, 'word.count as 'frequency) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // groupby + window agg + @Test(expected = classOf[TableException]) + def testGroupByAndProcessingTimeSlidingGroupWindow(): Unit = { --- End diff -- hi, these failures are throwed during `translateToPlan`, so `TableTestBase` can not help, right ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---