Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 063AA200B91 for ; Thu, 29 Sep 2016 15:47:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0519B160AE9; Thu, 29 Sep 2016 13:47:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 07B78160AE4 for ; Thu, 29 Sep 2016 15:47:49 +0200 (CEST) Received: (qmail 50546 invoked by uid 500); 29 Sep 2016 13:47:48 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 50537 invoked by uid 99); 29 Sep 2016 13:47:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Sep 2016 13:47:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB078DFE65; Thu, 29 Sep 2016 13:47:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: twalthr@apache.org To: commits@flink.apache.org Date: Thu, 29 Sep 2016 13:47:48 -0000 Message-Id: <241c9507dcdb4f3cadebd06dea2a0c49@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases archived-at: Thu, 29 Sep 2016 13:47:52 -0000 Repository: flink Updated Branches: refs/heads/master 8243138c3 -> 7758571ae http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala deleted file mode 100644 index 8889b37..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - - @Test(expected = classOf[ValidationException]) - def testGroupingOnNonExistentField(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. '_foo not a valid field - .groupBy('_foo) - .select('a.avg) - } - - @Test(expected = classOf[ValidationException]) - def testGroupingInvalidSelection(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .groupBy('a, 'b) - // must fail. 'c is not a grouping key or aggregation - .select('c) - } - - @Test - def testGroupedAggregate(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .groupBy('b) - .select('b, 'a.sum) - - val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupingKeyForwardIfNotUsed(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .groupBy('b) - .select('a.sum) - - val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupNoAggregation(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env) - .toTable(tEnv, 'a, 'b, 'c) - .groupBy('b) - .select('a.sum as 'd, 'b) - .groupBy('b, 'd) - .select('b) - - val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregateWithLongKeys(): Unit = { - // This uses very long keys to force serialized comparison. - // With short keys, the normalized key is sufficient. - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds = env.fromElements( - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2), - ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2)) - .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c) - .groupBy('a, 'b) - .select('c.sum) - - val expected = "10\n" + "8\n" - val results = ds.collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregateWithConstant1(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .select('a, 4 as 'four, 'b) - .groupBy('four, 'a) - .select('four, 'b.sum) - - val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" + - "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" + - "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n" - val results = t.toDataSet[Row].collect() - - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregateWithConstant2(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .select('b, 4 as 'four, 'a) - .groupBy('b, 'four) - .select('four, 'a.sum) - - val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregateWithExpression(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) - .groupBy('e, 'b % 3) - .select('c.min, 'e, 'a.avg, 'd.count) - - val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" + - "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testGroupedAggregateWithFilter(): Unit = { - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - .groupBy('b) - .select('b, 'a.sum) - .where('b === 2) - - val expected = "2,5\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala index f6e6081..67cac14 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala @@ -19,12 +19,14 @@ package org.apache.flink.api.scala.batch.table import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{TableException, ValidationException, Row, TableEnvironment} import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -32,12 +34,15 @@ import org.junit.runners.Parameterized import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { +class JoinITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { @Test def testJoin(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -53,7 +58,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) def testJoinWithFilter(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h) @@ -68,7 +73,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinWithJoinFilter(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -84,7 +89,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -100,7 +105,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[ValidationException]) def testJoinNonExistingKey(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -114,7 +119,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[ValidationException]) def testJoinWithNonMatchingKeyTypes(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -128,7 +133,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[ValidationException]) def testJoinWithAmbiguousFields(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c) @@ -142,7 +147,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[TableException]) def testNoEqualityJoinPredicate1(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -156,7 +161,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[TableException]) def testNoEqualityJoinPredicate2(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -170,7 +175,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinWithAggregation(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -185,7 +190,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinWithGroupedAggregation(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -203,7 +208,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinPushThroughJoin(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -223,7 +228,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinWithDisjunctivePred(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -240,7 +245,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testJoinWithExpressionPreds(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) @@ -259,8 +264,8 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[ValidationException]) def testJoinTablesFromDifferentEnvs(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv1 = TableEnvironment.getTableEnvironment(env) - val tEnv2 = TableEnvironment.getTableEnvironment(env) + val tEnv1 = TableEnvironment.getTableEnvironment(env, config) + val tEnv2 = TableEnvironment.getTableEnvironment(env, config) val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c) val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h) @@ -272,7 +277,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testLeftJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.getConfig.setNullCheck(true) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) @@ -294,7 +299,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[ValidationException]) def testNoJoinCondition(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.getConfig.setNullCheck(true) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) @@ -306,7 +311,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test(expected = classOf[ValidationException]) def testNoEquiJoin(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.getConfig.setNullCheck(true) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) @@ -318,7 +323,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testRightJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.getConfig.setNullCheck(true) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) @@ -337,7 +342,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testRightJoinWithNotOnlyEquiJoin(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.getConfig.setNullCheck(true) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) @@ -353,7 +358,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) @Test def testFullOuterJoinWithMultipleKeys(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) + val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.getConfig.setNullCheck(true) val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala deleted file mode 100644 index 1143afd..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit.Assert._ -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class SelectITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testSimpleSelectAll(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSimpleSelectAllWithAs(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSimpleSelectWithNaming(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - .select('_1 as 'a, '_2 as 'b, '_1 as 'c) - .select('a, 'b) - - val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testSimpleSelectRenameAll(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - .select('_1 as 'a, '_2 as 'b, '_3 as 'c) - .select('a, 'b) - - val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[ValidationException]) - def testSelectInvalidFieldFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. Field 'foo does not exist - .select('a, 'foo) - } - - @Test(expected = classOf[ValidationException]) - def testSelectAmbiguousRenaming(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. 'a and 'b are both renamed to 'foo - .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print() - } - - @Test(expected = classOf[ValidationException]) - def testSelectAmbiguousRenaming2(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c) - // must fail. 'a and 'b are both renamed to 'a - .select('a, 'b as 'a).toDataSet[Row].print() - } - - @Test - def testSelectStar(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testAliasStarException(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv) - .select('_1 as '*, '_2 as 'b, '_1 as 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - - try { - CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b) - fail("ValidationException expected") - } catch { - case _: ValidationException => //ignore - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala deleted file mode 100644 index 84bdbb0..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.batch.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils -import org.junit._ -import org.junit.runner.RunWith -import org.junit.runners.Parameterized - -import scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class ToTableITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { - - @Test - def testToTable(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val t = CollectionDataSets.get3TupleDataSet(env) - .toTable(tEnv, 'a, 'b, 'c) - .select('a, 'b, 'c) - - val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + - "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + - "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + - "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + - "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + - "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testToTableFromCaseClass(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val data = List( - SomeCaseClass("Peter", 28, 4000.00, "Sales"), - SomeCaseClass("Anna", 56, 10000.00, "Engineering"), - SomeCaseClass("Lucy", 42, 6000.00, "HR")) - - val t = env.fromCollection(data) - .toTable(tEnv, 'a, 'b, 'c, 'd) - .select('a, 'b, 'c, 'd) - - val expected: String = - "Peter,28,4000.0,Sales\n" + - "Anna,56,10000.0,Engineering\n" + - "Lucy,42,6000.0,HR\n" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test - def testToTableFromAndToCaseClass(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - val data = List( - SomeCaseClass("Peter", 28, 4000.00, "Sales"), - SomeCaseClass("Anna", 56, 10000.00, "Engineering"), - SomeCaseClass("Lucy", 42, 6000.00, "HR")) - - val t = env.fromCollection(data) - .toTable(tEnv, 'a, 'b, 'c, 'd) - .select('a, 'b, 'c, 'd) - - val expected: String = - "SomeCaseClass(Peter,28,4000.0,Sales)\n" + - "SomeCaseClass(Anna,56,10000.0,Engineering)\n" + - "SomeCaseClass(Lucy,42,6000.0,HR)\n" - val results = t.toDataSet[SomeCaseClass].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) - } - - @Test(expected = classOf[TableException]) - def testToTableWithToFewFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env) - // Must fail. Number of fields does not match. - .toTable(tEnv, 'a, 'b) - } - - @Test(expected = classOf[TableException]) - def testToTableWithToManyFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env) - // Must fail. Number of fields does not match. - .toTable(tEnv, 'a, 'b, 'c, 'd) - } - - @Test(expected = classOf[TableException]) - def testToTableWithAmbiguousFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env) - // Must fail. Field names not unique. - .toTable(tEnv, 'a, 'b, 'b) - } - - @Test(expected = classOf[TableException]) - def testToTableWithNonFieldReference1(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - // Must fail. as() can only have field references - CollectionDataSets.get3TupleDataSet(env) - .toTable(tEnv, 'a + 1, 'b, 'c) - } - - @Test(expected = classOf[TableException]) - def testToTableWithNonFieldReference2(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - // Must fail. as() can only have field references - CollectionDataSets.get3TupleDataSet(env) - .toTable(tEnv, 'a as 'foo, 'b, 'c) - } - -} - -case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) { - def this() { this("", 0, 0.0, "") } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala index 772850d..2ce42d4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala @@ -20,8 +20,7 @@ package org.apache.flink.api.scala.batch.utils import java.util -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode} import org.apache.flink.api.table.TableConfig import org.apache.flink.test.util.MultipleProgramsTestBase import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode @@ -37,7 +36,7 @@ class TableProgramsTestBase( def config: TableConfig = { val conf = new TableConfig tableConfigMode match { - case NULL => + case NO_NULL => conf.setNullCheck(false) case EFFICIENT => conf.setEfficientTypeUsage(true) @@ -48,21 +47,14 @@ class TableProgramsTestBase( } object TableProgramsTestBase { - sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean } - object TableConfigMode { - case object DEFAULT extends TableConfigMode { - val nullCheck = false; val efficientTypes = false - } - case object NULL extends TableConfigMode { - val nullCheck = true; val efficientTypes = false - } - case object EFFICIENT extends TableConfigMode { - val nullCheck = false; val efficientTypes = true - } - } + case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean) + + val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false) + val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false) + val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true) @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") def parameters(): util.Collection[Array[java.lang.Object]] = { - Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT)) + Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala new file mode 100644 index 0000000..578ad30 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData} +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.{Row, TableEnvironment, TableException} +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class CalcITCase extends StreamingMultipleProgramsTestBase { + + @Test + def testSimpleSelectAll(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hi", + "2,2,Hello", + "3,2,Hello world") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testSelectFirst(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("1", "2", "3") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testSimpleSelectWithNaming(): Unit = { + + // verify ProjectMergeRule. + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv) + .select('_1 as 'a, '_2 as 'b, '_1 as 'c) + .select('a, 'b) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4", + "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5", + "16,6", "17,6", "18,6", "19,6", "20,6", "21,6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testSimpleSelectAllWithAs(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + .select('a, 'b, 'c) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hi", + "2,2,Hello", + "3,2,Hello world") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test(expected = classOf[TableException]) + def testAsWithToFewFields(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("no") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test(expected = classOf[TableException]) + def testAsWithToManyFields(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("no") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test(expected = classOf[TableException]) + def testAsWithAmbiguousFields(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("no") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test(expected = classOf[TableException]) + def testOnlyFieldRefInAs(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd) + + val results = ds.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("no") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testSimpleFilter(): Unit = { + /* + * Test simple filter + */ + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter('a === 3) + val results = filterDs.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("3,2,Hello world") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAllRejectingFilter(): Unit = { + /* + * Test all-rejecting filter + */ + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( Literal(false) ) + val results = filterDs.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + assertEquals(true, StreamITCase.testResults.isEmpty) + } + + @Test + def testAllPassingFilter(): Unit = { + /* + * Test all-passing filter + */ + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( Literal(true) ) + val results = filterDs.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,1,Hi", + "2,2,Hello", + "3,2,Hello world") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testFilterOnIntegerTupleField(): Unit = { + /* + * Test filter on Integer tuple field. + */ + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( 'a % 2 === 0 ) + val results = filterDs.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "2,2,Hello", "4,3,Hello world, how are you?", + "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4", + "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10", + "18,6,Comment#12", "20,6,Comment#14") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNotEquals(): Unit = { + /* + * Test filter on Integer tuple field. + */ + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + + val filterDs = ds.filter( 'a % 2 !== 0) + val results = filterDs.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + val expected = mutable.MutableList( + "1,1,Hi", "3,2,Hello world", + "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3", + "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9", + "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala deleted file mode 100644 index 45b9b04..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.stream.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData} -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.{Row, TableEnvironment} -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.junit.Assert._ -import org.junit.Test - -import scala.collection.mutable - -class FilterITCase extends StreamingMultipleProgramsTestBase { - - @Test - def testSimpleFilter(): Unit = { - /* - * Test simple filter - */ - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter('a === 3) - val results = filterDs.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("3,2,Hello world") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testAllRejectingFilter(): Unit = { - /* - * Test all-rejecting filter - */ - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( Literal(false) ) - val results = filterDs.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - assertEquals(true, StreamITCase.testResults.isEmpty) - } - - @Test - def testAllPassingFilter(): Unit = { - /* - * Test all-passing filter - */ - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( Literal(true) ) - val results = filterDs.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "1,1,Hi", - "2,2,Hello", - "3,2,Hello world") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testFilterOnIntegerTupleField(): Unit = { - /* - * Test filter on Integer tuple field. - */ - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 === 0 ) - val results = filterDs.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "2,2,Hello", "4,3,Hello world, how are you?", - "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4", - "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10", - "18,6,Comment#12", "20,6,Comment#14") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testNotEquals(): Unit = { - /* - * Test filter on Integer tuple field. - */ - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - - val filterDs = ds.filter( 'a % 2 !== 0) - val results = filterDs.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - val expected = mutable.MutableList( - "1,1,Hi", "3,2,Hello world", - "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3", - "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9", - "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala deleted file mode 100644 index c6a2139..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.stream.table - -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData} -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{Row, TableEnvironment, TableException} -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.junit.Assert._ -import org.junit.Test - -import scala.collection.mutable - -class SelectITCase extends StreamingMultipleProgramsTestBase { - - @Test - def testSimpleSelectAll(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "1,1,Hi", - "2,2,Hello", - "3,2,Hello world") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testSelectFirst(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("1", "2", "3") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testSimpleSelectWithNaming(): Unit = { - - // verify ProjectMergeRule. - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv) - .select('_1 as 'a, '_2 as 'b, '_1 as 'c) - .select('a, 'b) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4", - "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5", - "16,6", "17,6", "18,6", "19,6", "20,6", "21,6") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testSimpleSelectAllWithAs(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - .select('a, 'b, 'c) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "1,1,Hi", - "2,2,Hello", - "3,2,Hello world") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test(expected = classOf[TableException]) - def testAsWithToFewFields(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("no") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test(expected = classOf[TableException]) - def testAsWithToManyFields(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("no") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test(expected = classOf[TableException]) - def testAsWithAmbiguousFields(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("no") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - - @Test(expected = classOf[TableException]) - def testOnlyFieldRefInAs(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("no") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } -}