Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 038ED2009C6 for ; Sun, 1 May 2016 14:47:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 01E261609AF; Sun, 1 May 2016 14:47:18 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C7FD41609AC for ; Sun, 1 May 2016 14:47:16 +0200 (CEST) Received: (qmail 89743 invoked by uid 500); 1 May 2016 12:47:15 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89534 invoked by uid 99); 1 May 2016 12:47:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 01 May 2016 12:47:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A807DFFB9; Sun, 1 May 2016 12:47:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Sun, 01 May 2016 12:47:15 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/9] flink git commit: [FLINK-3847] Restructure flink-table test packages. archived-at: Sun, 01 May 2016 12:47:18 -0000 Repository: flink Updated Branches: refs/heads/master b48d2caf4 -> 290a566cd http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala deleted file mode 100644 index 4e1ae02..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.test.utils - -import org.apache.calcite.rel.logical.LogicalProject -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR -import org.apache.calcite.tools.{Frameworks, RelBuilder} -import org.apache.flink.api.common.functions.{Function, MapFunction} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} -import org.apache.flink.api.java.{DataSet => JDataSet} -import org.apache.flink.api.table.{TableEnvironment, TableConfig} -import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.runtime.FunctionCompiler -import org.mockito.Mockito._ - -/** - * Utility to translate and evaluate an RexNode or Table API expression to a String. - */ -object ExpressionEvaluator { - - // TestCompiler that uses current class loader - class TestCompiler[T <: Function] extends FunctionCompiler[T] { - def compile(genFunc: GeneratedFunction[T]): Class[T] = - compile(getClass.getClassLoader, genFunc.name, genFunc.code) - } - - private def prepareTable( - typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = { - - // create DataSetTable - val dataSetMock = mock(classOf[DataSet[Any]]) - val jDataSetMock = mock(classOf[JDataSet[Any]]) - when(dataSetMock.javaSet).thenReturn(jDataSetMock) - when(jDataSetMock.getType).thenReturn(typeInfo) - - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val tableName = "myTable" - tEnv.registerDataSet(tableName, dataSetMock) - - // prepare RelBuilder - val relBuilder = tEnv.getRelBuilder - relBuilder.scan(tableName) - - (tableName, relBuilder, tEnv) - } - - def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = { - // create DataSetTable - val table = prepareTable(typeInfo) - - // create RelNode from SQL expression - val planner = Frameworks.getPlanner(table._3.getFrameworkConfig) - val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1) - val validated = planner.validate(parsed) - val converted = planner.rel(validated) - - val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0) - - evaluate(data, typeInfo, table._2, expr) - } - - def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = { - val relBuilder = prepareTable(typeInfo)._2 - evaluate(data, typeInfo, relBuilder, expr.toRexNode(relBuilder)) - } - - def evaluate( - data: Any, - typeInfo: TypeInformation[Any], - relBuilder: RelBuilder, - rexNode: RexNode): String = { - // generate code for Mapper - val config = new TableConfig() - val generator = new CodeGenerator(config, typeInfo) - val genExpr = generator.generateExpression(relBuilder.cast(rexNode, VARCHAR)) // cast to String - val bodyCode = - s""" - |${genExpr.code} - |return ${genExpr.resultTerm}; - |""".stripMargin - val genFunc = generator.generateFunction[MapFunction[Any, String]]( - "TestFunction", - classOf[MapFunction[Any, String]], - bodyCode, - STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]]) - - // compile and evaluate - val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc) - val mapper = clazz.newInstance() - mapper.map(data) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala deleted file mode 100644 index 723646b..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.test.utils - -import java.util - -import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaTableEnv} -import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv} -import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaTableEnv} -import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv} -import org.apache.flink.api.table.{TableEnvironment, TableConfig} -import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL} -import org.apache.flink.test.util.MultipleProgramsTestBase -import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.junit.runners.Parameterized - -import scala.collection.JavaConversions._ - -class TableProgramsTestBase( - mode: TestExecutionMode, - tableConfigMode: TableConfigMode) - extends MultipleProgramsTestBase(mode) { - - def config: TableConfig = { - val conf = new TableConfig - tableConfigMode match { - case NULL => - conf.setNullCheck(true) - case EFFICIENT => - conf.setEfficientTypeUsage(true) - case _ => // keep default - } - conf - } -} - -object TableProgramsTestBase { - sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean } - object TableConfigMode { - case object DEFAULT extends TableConfigMode { - val nullCheck = false; val efficientTypes = false - } - case object NULL extends TableConfigMode { - val nullCheck = true; val efficientTypes = false - } - case object EFFICIENT extends TableConfigMode { - val nullCheck = false; val efficientTypes = true - } - } - - @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") - def tableConfigs(): util.Collection[Array[java.lang.Object]] = { - Seq( - // TODO more tests in cluster mode? - Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT), - Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT), - Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL), - Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT) - ) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala deleted file mode 100644 index 6d31187..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} -import org.apache.flink.api.java.tuple -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo -import org.junit.Assert._ - -class RowComparatorTest extends ComparatorTestBase[Row] { - - val typeInfo = new RowTypeInfo( - Array( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo]))) - - val testPojo1 = new MyPojo() - // TODO we cannot test null here as PojoComparator has no support for null keys - testPojo1.name = "" - val testPojo2 = new MyPojo() - testPojo2.name = "Test1" - val testPojo3 = new MyPojo() - testPojo3.name = "Test2" - - val data: Array[Row] = Array( - createRow(null, null, null, null, null), - createRow(0, null, null, null, null), - createRow(0, 0.0, null, null, null), - createRow(0, 0.0, "a", null, null), - createRow(1, 0.0, "a", null, null), - createRow(1, 1.0, "a", null, null), - createRow(1, 1.0, "b", null, null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) - ) - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - - override protected def createComparator(ascending: Boolean): TypeComparator[Row] = { - typeInfo.createComparator( - Array(0, 1, 2, 3, 4, 5, 6), - Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending), - 0, - new ExecutionConfig()) - } - - override protected def createSerializer(): TypeSerializer[Row] = { - typeInfo.createSerializer(new ExecutionConfig()) - } - - override protected def getSortedTestData: Array[Row] = { - data - } - - override protected def supportsNullKeys: Boolean = true - - def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { - val r: Row = new Row(5) - r.setField(0, f0) - r.setField(1, f1) - r.setField(2, f2) - r.setField(3, f3) - r.setField(4, f4) - r - } -} - -object RowComparatorTest { - class MyPojo() extends Serializable with Comparable[MyPojo] { - // we cannot use null because the PojoComparator does not support null properly - var name: String = "" - - override def compareTo(o: MyPojo): Int = { - if (name == null && o.name == null) { - 0 - } - else if (name == null) { - -1 - } - else if (o.name == null) { - 1 - } - else { - name.compareTo(o.name) - } - } - - override def equals(other: Any): Boolean = other match { - case that: MyPojo => compareTo(that) == 0 - case _ => false - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala deleted file mode 100644 index 95a1bb5..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeutils - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer} -import org.apache.flink.api.java.tuple -import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo} -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo -import org.junit.Assert._ -import org.junit.Test - -class RowSerializerTest { - - class RowSerializerTestInstance( - serializer: TypeSerializer[Row], - testData: Array[Row]) - extends SerializerTestInstance[Row](serializer, classOf[Row], -1, testData: _*) { - - override protected def deepEquals(message: String, should: Row, is: Row): Unit = { - val arity = should.productArity - assertEquals(message, arity, is.productArity) - var index = 0 - while (index < arity) { - val copiedValue: Any = should.productElement(index) - val element: Any = is.productElement(index) - assertEquals(message, element, copiedValue) - index += 1 - } - } - } - - @Test - def testRowSerializer(): Unit = { - val rowInfo: TypeInformation[Row] = new RowTypeInfo( - Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) - - val row1 = new Row(2) - row1.setField(0, 1) - row1.setField(1, "a") - - val row2 = new Row(2) - row2.setField(0, 2) - row2.setField(1, null) - - val testData: Array[Row] = Array(row1, row2) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer, testData) - - testInstance.testAll() - } - - @Test - def testLargeRowSerializer(): Unit = { - val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO)) - - val row = new Row(13) - row.setField(0, 2) - row.setField(1, null) - row.setField(3, null) - row.setField(4, null) - row.setField(5, null) - row.setField(6, null) - row.setField(7, null) - row.setField(8, null) - row.setField(9, null) - row.setField(10, null) - row.setField(11, null) - row.setField(12, "Test") - - val testData: Array[Row] = Array(row) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer, testData) - - testInstance.testAll() - } - - @Test - def testRowSerializerWithComplexTypes(): Unit = { - val rowInfo = new RowTypeInfo( - Array( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.DOUBLE_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - new TupleTypeInfo[tuple.Tuple2[Int, Boolean]]( - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BOOLEAN_TYPE_INFO, - BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo]))) - - val testPojo1 = new MyPojo() - testPojo1.name = null - val testPojo2 = new MyPojo() - testPojo2.name = "Test1" - val testPojo3 = new MyPojo() - testPojo3.name = "Test2" - - val testData: Array[Row] = Array( - createRow(null, null, null, null, null), - createRow(0, null, null, null, null), - createRow(0, 0.0, null, null, null), - createRow(0, 0.0, "a", null, null), - createRow(1, 0.0, "a", null, null), - createRow(1, 1.0, "a", null, null), - createRow(1, 1.0, "b", null, null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2), - createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3) - ) - - val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig) - - val testInstance = new RowSerializerTestInstance(rowSerializer, testData) - - testInstance.testAll() - } - - // ---------------------------------------------------------------------------------------------- - - def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = { - val r: Row = new Row(5) - r.setField(0, f0) - r.setField(1, f1) - r.setField(2, f2) - r.setField(3, f3) - r.setField(4, f4) - r - } -} - -object RowSerializerTest { - class MyPojo() extends Serializable with Comparable[MyPojo] { - var name: String = null - - override def compareTo(o: MyPojo): Int = { - if (name == null && o.name == null) { - 0 - } - else if (name == null) { - -1 - } - else if (o.name == null) { - 1 - } - else { - name.compareTo(o.name) - } - } - - override def equals(other: Any): Boolean = other match { - case that: MyPojo => compareTo(that) == 0 - case _ => false - } - } -}