flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [01/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.
Date Fri, 16 Dec 2016 15:46:30 GMT
Repository: flink
Updated Branches:
  refs/heads/master 48ef46a4d -> 67c4be648


http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCorrelateITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCorrelateITCase.scala
new file mode 100644
index 0000000..eb20517
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCorrelateITCase.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.datastream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableFunc0
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .join(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .leftOuterJoin(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .toDataStream[Row]
+
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "nosharp,null,null", "Jack#22,Jack,22",
+      "John#19,John,19", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  private def testData(
+    env: StreamExecutionEnvironment)
+  : DataStream[(Int, Long, String)] = {
+
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Jack#22"))
+    data.+=((2, 2L, "John#19"))
+    data.+=((3, 2L, "Anna#44"))
+    data.+=((4, 3L, "nosharp"))
+    env.fromCollection(data)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
new file mode 100644
index 0000000..ff2c5d3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.table.api.{Table, TableEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.junit.Assert.assertEquals
+import org.mockito.Mockito.{mock, when}
+
+/**
+  * Test base for testing Table API / SQL plans.
+  */
+class TableTestBase {
+
+  def batchTestUtil(): BatchTableTestUtil = {
+    BatchTableTestUtil()
+  }
+
+  def streamTestUtil(): StreamTableTestUtil = {
+    StreamTableTestUtil()
+  }
+
+  def verifyTableEquals(expected: Table, actual: Table): Unit = {
+    assertEquals(
+      "Logical plans do not match",
+      RelOptUtil.toString(expected.getRelNode),
+      RelOptUtil.toString(actual.getRelNode))
+  }
+
+}
+
+abstract class TableTestUtil {
+
+  private var counter = 0
+
+  def addTable[T: TypeInformation](fields: Expression*): Table = {
+    addTable[T](s"Table${counter += 1}", fields: _*)
+  }
+
+  def addTable[T: TypeInformation](name: String, fields: Expression*): Table
+  def addFunction[T: TypeInformation](name: String, function: TableFunction[T]): TableFunction[T]
+  def addFunction(name: String, function: ScalarFunction): Unit
+
+  def verifySql(query: String, expected: String): Unit
+  def verifyTable(resultTable: Table, expected: String): Unit
+
+  // the print methods are for debugging purposes only
+  def printTable(resultTable: Table): Unit
+  def printSql(query: String): Unit
+}
+
+object TableTestUtil {
+
+  // this methods are currently just for simplifying string construction,
+  // we could replace it with logic later
+
+  def unaryNode(node: String, input: String, term: String*): String = {
+    s"""$node(${term.mkString(", ")})
+       |$input
+       |""".stripMargin.stripLineEnd
+  }
+
+  def binaryNode(node: String, left: String, right: String, term: String*): String = {
+    s"""$node(${term.mkString(", ")})
+       |$left
+       |$right
+       |""".stripMargin.stripLineEnd
+  }
+
+  def values(node: String, term: String*): String = {
+    s"$node(${term.mkString(", ")})"
+  }
+
+  def term(term: AnyRef, value: AnyRef*): String = {
+    s"$term=[${value.mkString(", ")}]"
+  }
+
+  def tuples(value:List[AnyRef]*): String={
+    val listValues = value.map( listValue => s"{ ${listValue.mkString(", ")} }")
+    term("tuples","[" + listValues.mkString(", ") + "]")
+  }
+
+  def batchTableNode(idx: Int): String = {
+    s"DataSetScan(table=[[_DataSetTable_$idx]])"
+  }
+
+  def streamTableNode(idx: Int): String = {
+    s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
+  }
+
+}
+
+case class BatchTableTestUtil() extends TableTestUtil {
+
+  val env = mock(classOf[ExecutionEnvironment])
+  val tEnv = TableEnvironment.getTableEnvironment(env)
+
+  def addTable[T: TypeInformation](
+      name: String,
+      fields: Expression*)
+    : Table = {
+    val ds = mock(classOf[DataSet[T]])
+    val jDs = mock(classOf[JDataSet[T]])
+    when(ds.javaSet).thenReturn(jDs)
+    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+    when(jDs.getType).thenReturn(typeInfo)
+
+    val t = ds.toTable(tEnv, fields: _*)
+    tEnv.registerTable(name, t)
+    t
+  }
+
+  def addFunction[T: TypeInformation](
+      name: String,
+      function: TableFunction[T])
+    : TableFunction[T] = {
+    tEnv.registerFunction(name, function)
+    function
+  }
+
+  def addFunction(name: String, function: ScalarFunction): Unit = {
+    tEnv.registerFunction(name, function)
+  }
+
+  def verifySql(query: String, expected: String): Unit = {
+    verifyTable(tEnv.sql(query), expected)
+  }
+
+  def verifyTable(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    val actual = RelOptUtil.toString(optimized)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
+  }
+
+  def printTable(resultTable: Table): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    println(RelOptUtil.toString(optimized))
+  }
+
+  def printSql(query: String): Unit = {
+    printTable(tEnv.sql(query))
+  }
+}
+
+case class StreamTableTestUtil() extends TableTestUtil {
+
+  val env = mock(classOf[StreamExecutionEnvironment])
+  val tEnv = TableEnvironment.getTableEnvironment(env)
+
+  def addTable[T: TypeInformation](
+      name: String,
+      fields: Expression*)
+    : Table = {
+
+    val ds = mock(classOf[DataStream[T]])
+    val jDs = mock(classOf[JDataStream[T]])
+    when(ds.javaStream).thenReturn(jDs)
+    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
+    when(jDs.getType).thenReturn(typeInfo)
+
+    val t = ds.toTable(tEnv, fields: _*)
+    tEnv.registerTable(name, t)
+    t
+  }
+
+  def addFunction[T: TypeInformation](
+      name: String,
+      function: TableFunction[T])
+    : TableFunction[T] = {
+    tEnv.registerFunction(name, function)
+    function
+  }
+
+  def addFunction(name: String, function: ScalarFunction): Unit = {
+    tEnv.registerFunction(name, function)
+  }
+
+  def verifySql(query: String, expected: String): Unit = {
+    verifyTable(tEnv.sql(query), expected)
+  }
+
+  def verifyTable(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    val actual = RelOptUtil.toString(optimized)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
+  }
+
+  // the print methods are for debugging purposes only
+  def printTable(resultTable: Table): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    println(RelOptUtil.toString(optimized))
+  }
+
+  def printSql(query: String): Unit = {
+    printTable(tEnv.sql(query))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
new file mode 100644
index 0000000..54861ea
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.utils
+
+import java.lang.Boolean
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple3
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.TableFunction
+
+
+case class SimpleUser(name: String, age: Int)
+
+class TableFunc0 extends TableFunction[SimpleUser] {
+  // make sure input element's format is "<string>#<int>"
+  def eval(user: String): Unit = {
+    if (user.contains("#")) {
+      val splits = user.split("#")
+      collect(SimpleUser(splits(0), splits(1).toInt))
+    }
+  }
+}
+
+class TableFunc1 extends TableFunction[String] {
+  def eval(str: String): Unit = {
+    if (str.contains("#")){
+      str.split("#").foreach(collect)
+    }
+  }
+
+  def eval(str: String, prefix: String): Unit = {
+    if (str.contains("#")) {
+      str.split("#").foreach(s => collect(prefix + s))
+    }
+  }
+}
+
+
+class TableFunc2 extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    if (str.contains("#")) {
+      str.split("#").foreach({ s =>
+        val row = new Row(2)
+        row.setField(0, s)
+        row.setField(1, s.length)
+        collect(row)
+      })
+    }
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
+                    BasicTypeInfo.INT_TYPE_INFO)
+  }
+}
+
+class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
+  def eval(user: String) {
+    if (user.contains("#")) {
+      val splits = user.split("#")
+      val age = splits(1).toInt
+      collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
+    }
+  }
+}
+
+abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String, A, B]]
{}
+
+class PojoTableFunc extends TableFunction[PojoUser] {
+  def eval(user: String) {
+    if (user.contains("#")) {
+      val splits = user.split("#")
+      collect(new PojoUser(splits(0), splits(1).toInt))
+    }
+  }
+}
+
+class PojoUser() {
+  var name: String = _
+  var age: Int = 0
+
+  def this(name: String, age: Int) {
+    this()
+    this.name = name
+    this.age = age
+  }
+}
+
+// ----------------------------------------------------------------------------------------------
+// Invalid Table Functions
+// ----------------------------------------------------------------------------------------------
+
+
+// this is used to check whether scala object is forbidden
+object ObjectTableFunction extends TableFunction[Integer] {
+  def eval(a: Int, b: Int): Unit = {
+    collect(a)
+    collect(b)
+  }
+}


Mime
View raw message