flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/9] flink git commit: [FLINK-3847] Restructure flink-table test packages.
Date Sun, 01 May 2016 12:47:15 GMT
Repository: flink
Updated Branches:
  refs/heads/master b48d2caf4 -> 290a566cd


http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
deleted file mode 100644
index 4e1ae02..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.test.utils
-
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.apache.calcite.tools.{Frameworks, RelBuilder}
-import org.apache.flink.api.common.functions.{Function, MapFunction}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.api.table.{TableEnvironment, TableConfig}
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.runtime.FunctionCompiler
-import org.mockito.Mockito._
-
-/**
-  * Utility to translate and evaluate an RexNode or Table API expression to a String.
-  */
-object ExpressionEvaluator {
-
-  // TestCompiler that uses current class loader
-  class TestCompiler[T <: Function] extends FunctionCompiler[T] {
-    def compile(genFunc: GeneratedFunction[T]): Class[T] =
-      compile(getClass.getClassLoader, genFunc.name, genFunc.code)
-  }
-
-  private def prepareTable(
-    typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = {
-
-    // create DataSetTable
-    val dataSetMock = mock(classOf[DataSet[Any]])
-    val jDataSetMock = mock(classOf[JDataSet[Any]])
-    when(dataSetMock.javaSet).thenReturn(jDataSetMock)
-    when(jDataSetMock.getType).thenReturn(typeInfo)
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val tableName = "myTable"
-    tEnv.registerDataSet(tableName, dataSetMock)
-
-    // prepare RelBuilder
-    val relBuilder = tEnv.getRelBuilder
-    relBuilder.scan(tableName)
-
-    (tableName, relBuilder, tEnv)
-  }
-
-  def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = {
-    // create DataSetTable
-    val table = prepareTable(typeInfo)
-
-    // create RelNode from SQL expression
-    val planner = Frameworks.getPlanner(table._3.getFrameworkConfig)
-    val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1)
-    val validated = planner.validate(parsed)
-    val converted = planner.rel(validated)
-
-    val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0)
-
-    evaluate(data, typeInfo, table._2, expr)
-  }
-
-  def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = {
-    val relBuilder = prepareTable(typeInfo)._2
-    evaluate(data, typeInfo, relBuilder, expr.toRexNode(relBuilder))
-  }
-
-  def evaluate(
-      data: Any,
-      typeInfo: TypeInformation[Any],
-      relBuilder: RelBuilder,
-      rexNode: RexNode): String = {
-    // generate code for Mapper
-    val config = new TableConfig()
-    val generator = new CodeGenerator(config, typeInfo)
-    val genExpr = generator.generateExpression(relBuilder.cast(rexNode, VARCHAR)) // cast
to String
-    val bodyCode =
-      s"""
-        |${genExpr.code}
-        |return ${genExpr.resultTerm};
-        |""".stripMargin
-    val genFunc = generator.generateFunction[MapFunction[Any, String]](
-      "TestFunction",
-      classOf[MapFunction[Any, String]],
-      bodyCode,
-      STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]])
-
-    // compile and evaluate
-    val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
-    val mapper = clazz.newInstance()
-    mapper.map(data)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
deleted file mode 100644
index 723646b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.test.utils
-
-import java.util
-
-import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaTableEnv}
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
-import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaTableEnv}
-import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
-import org.apache.flink.api.table.{TableEnvironment, TableConfig}
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT,
NULL}
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-class TableProgramsTestBase(
-    mode: TestExecutionMode,
-    tableConfigMode: TableConfigMode)
-  extends MultipleProgramsTestBase(mode) {
-
-  def config: TableConfig = {
-    val conf = new TableConfig
-    tableConfigMode match {
-      case NULL =>
-        conf.setNullCheck(true)
-      case EFFICIENT =>
-        conf.setEfficientTypeUsage(true)
-      case _ => // keep default
-    }
-    conf
-  }
-}
-
-object TableProgramsTestBase {
-  sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean }
-  object TableConfigMode {
-    case object DEFAULT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = false
-    }
-    case object NULL extends TableConfigMode {
-      val nullCheck = true; val efficientTypes = false
-    }
-    case object EFFICIENT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = true
-    }
-  }
-
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-  def tableConfigs(): util.Collection[Array[java.lang.Object]] = {
-    Seq(
-      // TODO more tests in cluster mode?
-      Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT),
-      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT),
-      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL),
-      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT)
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
deleted file mode 100644
index 6d31187..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.tuple
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo
-import org.junit.Assert._
-
-class RowComparatorTest extends ComparatorTestBase[Row] {
-
-  val typeInfo = new RowTypeInfo(
-    Array(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.BOOLEAN_TYPE_INFO,
-        BasicTypeInfo.SHORT_TYPE_INFO),
-      TypeExtractor.createTypeInfo(classOf[MyPojo])))
-
-  val testPojo1 = new MyPojo()
-  // TODO we cannot test null here as PojoComparator has no support for null keys
-  testPojo1.name = ""
-  val testPojo2 = new MyPojo()
-  testPojo2.name = "Test1"
-  val testPojo3 = new MyPojo()
-  testPojo3.name = "Test2"
-
-  val data: Array[Row] = Array(
-    createRow(null, null, null, null, null),
-    createRow(0, null, null, null, null),
-    createRow(0, 0.0, null, null, null),
-    createRow(0, 0.0, "a", null, null),
-    createRow(1, 0.0, "a", null, null),
-    createRow(1, 1.0, "a", null, null),
-    createRow(1, 1.0, "b", null, null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3)
-    )
-
-  override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
-    val arity = should.productArity
-    assertEquals(message, arity, is.productArity)
-    var index = 0
-    while (index < arity) {
-      val copiedValue: Any = should.productElement(index)
-      val element: Any = is.productElement(index)
-      assertEquals(message, element, copiedValue)
-      index += 1
-    }
-  }
-
-  override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
-    typeInfo.createComparator(
-      Array(0, 1, 2, 3, 4, 5, 6),
-      Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending),
-      0,
-      new ExecutionConfig())
-  }
-
-  override protected def createSerializer(): TypeSerializer[Row] = {
-    typeInfo.createSerializer(new ExecutionConfig())
-  }
-
-  override protected def getSortedTestData: Array[Row] = {
-    data
-  }
-
-  override protected def supportsNullKeys: Boolean = true
-
-  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
-    val r: Row = new Row(5)
-    r.setField(0, f0)
-    r.setField(1, f1)
-    r.setField(2, f2)
-    r.setField(3, f3)
-    r.setField(4, f4)
-    r
-  }
-}
-
-object RowComparatorTest {
-  class MyPojo() extends Serializable with Comparable[MyPojo] {
-    // we cannot use null because the PojoComparator does not support null properly
-    var name: String = ""
-
-    override def compareTo(o: MyPojo): Int = {
-      if (name == null && o.name == null) {
-        0
-      }
-      else if (name == null) {
-        -1
-      }
-      else if (o.name == null) {
-        1
-      }
-      else {
-        name.compareTo(o.name)
-      }
-    }
-
-    override def equals(other: Any): Boolean = other match {
-      case that: MyPojo => compareTo(that) == 0
-      case _ => false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/290a566c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
deleted file mode 100644
index 95a1bb5..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
-import org.apache.flink.api.java.tuple
-import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo
-import org.junit.Assert._
-import org.junit.Test
-
-class RowSerializerTest {
-
-  class RowSerializerTestInstance(
-      serializer: TypeSerializer[Row],
-      testData: Array[Row])
-    extends SerializerTestInstance[Row](serializer, classOf[Row], -1, testData: _*) {
-
-    override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
-      val arity = should.productArity
-      assertEquals(message, arity, is.productArity)
-      var index = 0
-      while (index < arity) {
-        val copiedValue: Any = should.productElement(index)
-        val element: Any = is.productElement(index)
-        assertEquals(message, element, copiedValue)
-        index += 1
-      }
-    }
-  }
-
-  @Test
-  def testRowSerializer(): Unit = {
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
-
-    val row1 = new Row(2)
-    row1.setField(0, 1)
-    row1.setField(1, "a")
-
-    val row2 = new Row(2)
-    row2.setField(0, 2)
-    row2.setField(1, null)
-
-    val testData: Array[Row] = Array(row1, row2)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
-
-    testInstance.testAll()
-  }
-
-  @Test
-  def testLargeRowSerializer(): Unit = {
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-
-    val row = new Row(13)
-    row.setField(0, 2)
-    row.setField(1, null)
-    row.setField(3, null)
-    row.setField(4, null)
-    row.setField(5, null)
-    row.setField(6, null)
-    row.setField(7, null)
-    row.setField(8, null)
-    row.setField(9, null)
-    row.setField(10, null)
-    row.setField(11, null)
-    row.setField(12, "Test")
-
-    val testData: Array[Row] = Array(row)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
-
-    testInstance.testAll()
-  }
-
-  @Test
-  def testRowSerializerWithComplexTypes(): Unit = {
-    val rowInfo = new RowTypeInfo(
-      Array(
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO,
-        new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
-          BasicTypeInfo.INT_TYPE_INFO,
-          BasicTypeInfo.BOOLEAN_TYPE_INFO,
-          BasicTypeInfo.SHORT_TYPE_INFO),
-        TypeExtractor.createTypeInfo(classOf[MyPojo])))
-
-    val testPojo1 = new MyPojo()
-    testPojo1.name = null
-    val testPojo2 = new MyPojo()
-    testPojo2.name = "Test1"
-    val testPojo3 = new MyPojo()
-    testPojo3.name = "Test2"
-
-    val testData: Array[Row] = Array(
-      createRow(null, null, null, null, null),
-      createRow(0, null, null, null, null),
-      createRow(0, 0.0, null, null, null),
-      createRow(0, 0.0, "a", null, null),
-      createRow(1, 0.0, "a", null, null),
-      createRow(1, 1.0, "a", null, null),
-      createRow(1, 1.0, "b", null, null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3)
-    )
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
-
-    testInstance.testAll()
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
-    val r: Row = new Row(5)
-    r.setField(0, f0)
-    r.setField(1, f1)
-    r.setField(2, f2)
-    r.setField(3, f3)
-    r.setField(4, f4)
-    r
-  }
-}
-
-object RowSerializerTest {
-  class MyPojo() extends Serializable with Comparable[MyPojo] {
-    var name: String = null
-
-    override def compareTo(o: MyPojo): Int = {
-      if (name == null && o.name == null) {
-        0
-      }
-      else if (name == null) {
-        -1
-      }
-      else if (o.name == null) {
-        1
-      }
-      else {
-        name.compareTo(o.name)
-      }
-    }
-
-    override def equals(other: Any): Boolean = other match {
-      case that: MyPojo => compareTo(that) == 0
-      case _ => false
-    }
-  }
-}


Mime
View raw message