flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [01/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test
Date Thu, 13 Jul 2017 10:18:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9bfc927ea -> f1fafc0e1


http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
new file mode 100644
index 0000000..c800289
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.sinks.validation
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.table.TestAppendSink
+import org.junit.Test
+
+class TableSinksValidationTest extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testAppendSinkOnUpdatingTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
+
+    t.groupBy('text)
+    .select('text, 'id.count, 'num.sum)
+    .writeToSink(new TestAppendSink)
+
+    // must fail because table is not append-only
+    env.execute()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
new file mode 100644
index 0000000..b01453d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Assert, Test}
+
+class TableSourceTest extends TableSourceTestBase {
+
+  @Test
+  def testTableSourceScanToString(): Unit = {
+    val (tableSource1, _) = filterableTableSource
+    val (tableSource2, _) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource("table1", tableSource1)
+    tEnv.registerTableSource("table2", tableSource2)
+
+    val table1 = tEnv.scan("table1").where("amount > 2")
+    val table2 = tEnv.scan("table2").where("amount > 2")
+    val result = table1.unionAll(table2)
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      batchFilterableSourceTableNode(
+        "table1",
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      batchFilterableSourceTableNode(
+        "table2",
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("union", "name, id, amount, price")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testCsvTableSourceBuilder(): Unit = {
+    val source1 = CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+      .quoteCharacter(';')
+      .fieldDelimiter("#")
+      .lineDelimiter("\r\n")
+      .commentPrefix("%%")
+      .ignoreFirstLine()
+      .ignoreParseErrors()
+      .build()
+
+    val source2 = new CsvTableSource(
+      "/path/to/csv",
+      Array("myfield", "myfield2"),
+      Array(Types.STRING, Types.INT),
+      "#",
+      "\r\n",
+      ';',
+      true,
+      "%%",
+      true)
+
+    Assert.assertEquals(source1, source2)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
new file mode 100644
index 0000000..d408694
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
+
+class TableSourceTestBase extends TableTestBase {
+
+  protected val projectedFields: Array[String] = Array("last", "id", "score")
+  protected val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  def filterableTableSource:(TableSource[_], String) = {
+    val tableSource = new TestFilterableTableSource
+    (tableSource, "filterableTable")
+  }
+
+  def csvTable: (CsvTableSource, String) = {
+    val csvTable = CommonTestData.getCsvTableSource
+    val tableName = "csvTable"
+    (csvTable, tableName)
+  }
+
+  def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
+    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def batchFilterableSourceTableNode(
+      sourceName: String,
+      fields: Array[String],
+      exp: String): String = {
+    "BatchTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
+  }
+
+  def streamFilterableSourceTableNode(
+      sourceName: String,
+      fields: Array[String],
+      exp: String): String = {
+    "StreamTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
new file mode 100644
index 0000000..f670452
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.batch
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.sources.{CsvTableSource, TableSource}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
+import org.junit.{Assert, Test}
+
+class TableSourceTest extends TableTestBase {
+
+  private val projectedFields: Array[String] = Array("last", "id", "score")
+  private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  @Test
+  def testTableSourceScanToString(): Unit = {
+    val (tableSource1, _) = filterableTableSource
+    val (tableSource2, _) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource("table1", tableSource1)
+    tEnv.registerTableSource("table2", tableSource2)
+
+    val table1 = tEnv.scan("table1").where("amount > 2")
+    val table2 = tEnv.scan("table2").where("amount > 2")
+    val result = table1.unionAll(table2)
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      batchFilterableSourceTableNode(
+        "table1",
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      batchFilterableSourceTableNode(
+        "table2",
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("union", "name, id, amount, price")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  // batch plan
+
+  @Test
+  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchSourceTableNode(tableName, projectedFields),
+      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanPlanSQL(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+
+    util.tableEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchSourceTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = batchSourceTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableWithoutPushDown(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price")),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterablePartialPushDown(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .where("amount > 2 && price * 2 < 32")
+      .select('price, 'name.lowerCase(), 'amount)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "LOWER(name) AS _c1", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableFullyPushedDown(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("amount > 2 && amount < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2 && 'amount < 32"),
+      term("select", "price", "id", "amount")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableWithUnconvertedExpression(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)")
// cast can not be converted
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "id", "amount"),
+      term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
+    )
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchFilterableWithUDF(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = batchTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+    val func = Func0
+    tEnv.registerFunction("func0", func)
+
+    val result = tEnv
+        .scan(tableName)
+        .select('price, 'id, 'amount)
+        .where("amount > 2 && func0(amount) < 32")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "id", "amount"),
+      term("where", s"<(${func.functionIdentifier}(amount), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // stream plan
+
+  @Test
+  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last, 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamSourceTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanSQL(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+
+    util.tableEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamSourceTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = streamSourceTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = streamTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('price, 'id, 'amount)
+      .where("amount > 2 && price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // csv builder
+
+  @Test
+  def testCsvTableSourceBuilder(): Unit = {
+    val source1 = CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      .field("myfield2", Types.INT)
+      .quoteCharacter(';')
+      .fieldDelimiter("#")
+      .lineDelimiter("\r\n")
+      .commentPrefix("%%")
+      .ignoreFirstLine()
+      .ignoreParseErrors()
+      .build()
+
+    val source2 = new CsvTableSource(
+      "/path/to/csv",
+      Array("myfield", "myfield2"),
+      Array(Types.STRING, Types.INT),
+      "#",
+      "\r\n",
+      ';',
+      true,
+      "%%",
+      true)
+
+    Assert.assertEquals(source1, source2)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithNullPath(): Unit = {
+    CsvTableSource.builder()
+      .field("myfield", Types.STRING)
+      // should fail, path is not defined
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      // should fail, field name must no be duplicate
+      .field("myfield", Types.INT)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      // should fail, field can be empty
+      .build()
+  }
+
+  // utils
+
+  def filterableTableSource:(TableSource[_], String) = {
+    val tableSource = new TestFilterableTableSource
+    (tableSource, "filterableTable")
+  }
+
+  def csvTable: (CsvTableSource, String) = {
+    val csvTable = CommonTestData.getCsvTableSource
+    val tableName = "csvTable"
+    (csvTable, tableName)
+  }
+
+  def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
+    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def batchFilterableSourceTableNode(
+      sourceName: String,
+      fields: Array[String],
+      exp: String): String = {
+    "BatchTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
+  }
+
+  def streamFilterableSourceTableNode(
+      sourceName: String,
+      fields: Array[String],
+      exp: String): String = {
+    "StreamTableSourceScan(" +
+      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
new file mode 100644
index 0000000..e6d6480
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.stream.sql
+
+import org.apache.flink.table.sources.TableSourceTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class TableSourceTest extends TableSourceTestBase {
+
+  @Test
+  def testProjectableSourceScanPlan(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+
+    util.tableEnv.registerTableSource(tableName, tableSource)
+
+    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamSourceTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
new file mode 100644
index 0000000..c43913b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.stream.table
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.TableSourceTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class TableSourceTest extends TableSourceTestBase {
+
+  @Test
+  def testProjectableSourceScanPlan(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last, 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamSourceTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = streamTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = streamSourceTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilterableSourceScanPlan(): Unit = {
+    val (tableSource, tableName) = filterableTableSource
+    val util = streamTestUtil()
+    val tEnv = util.tableEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('price, 'id, 'amount)
+      .where("amount > 2 && price * 2 < 32")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamFilterableSourceTableNode(
+        tableName,
+        Array("name", "id", "amount", "price"),
+        "'amount > 2"),
+      term("select", "price", "id", "amount"),
+      term("where", "<(*(price, 2), 32)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
new file mode 100644
index 0000000..e575ab8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.validation
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.sources.TableSourceTestBase
+import org.junit.Test
+
+class TableSourceBalidationTest extends TableSourceTestBase {
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithNullPath(): Unit = {
+    CsvTableSource.builder()
+      .field("myfield", Types.STRING)
+      // should fail, path is not defined
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      // should fail, field name must no be duplicate
+      .field("myfield", Types.INT)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      // should fail, field can be empty
+      .build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 402a69d..6657d50 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.{DataSet => JDataSet}
 import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
@@ -62,7 +63,6 @@ class TableTestBase {
       LogicalPlanFormatUtils.formatTempTableId(RelOptUtil.toString(expected.getRelNode)),
       LogicalPlanFormatUtils.formatTempTableId(RelOptUtil.toString(actual.getRelNode)))
   }
-
 }
 
 abstract class TableTestUtil {
@@ -75,14 +75,18 @@ abstract class TableTestUtil {
   }
 
   def addTable[T: TypeInformation](name: String, fields: Expression*): Table
+
   def addFunction[T: TypeInformation](name: String, function: TableFunction[T]): TableFunction[T]
+
   def addFunction(name: String, function: ScalarFunction): Unit
 
   def verifySql(query: String, expected: String): Unit
+
   def verifyTable(resultTable: Table, expected: String): Unit
 
   // the print methods are for debugging purposes only
   def printTable(resultTable: Table): Unit
+
   def printSql(query: String): Unit
 }
 
@@ -112,9 +116,9 @@ object TableTestUtil {
     s"$term=[${value.mkString(", ")}]"
   }
 
-  def tuples(value:List[AnyRef]*): String={
-    val listValues = value.map( listValue => s"{ ${listValue.mkString(", ")} }")
-    term("tuples","[" + listValues.mkString(", ") + "]")
+  def tuples(value: List[AnyRef]*): String = {
+    val listValues = value.map(listValue => s"{ ${listValue.mkString(", ")} }")
+    term("tuples", "[" + listValues.mkString(", ") + "]")
   }
 
   def batchTableNode(idx: Int): String = {
@@ -128,9 +132,10 @@ object TableTestUtil {
 }
 
 case class BatchTableTestUtil() extends TableTestUtil {
-
+  val javaEnv = mock(classOf[JExecutionEnvironment])
+  val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
   val env = mock(classOf[ExecutionEnvironment])
-  val tEnv = TableEnvironment.getTableEnvironment(env)
+  val tableEnv = TableEnvironment.getTableEnvironment(env)
 
   def addTable[T: TypeInformation](
       name: String,
@@ -142,8 +147,18 @@ case class BatchTableTestUtil() extends TableTestUtil {
     val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
     when(jDs.getType).thenReturn(typeInfo)
 
-    val t = ds.toTable(tEnv, fields: _*)
-    tEnv.registerTable(name, t)
+    val t = ds.toTable(tableEnv, fields: _*)
+    tableEnv.registerTable(name, t)
+    t
+  }
+
+  def addJavaTable[T](typeInfo: TypeInformation[T], name: String, fields: String): Table
= {
+
+    val jDs = mock(classOf[JDataSet[T]])
+    when(jDs.getType).thenReturn(typeInfo)
+
+    val t = javaTableEnv.fromDataSet(jDs, fields)
+    javaTableEnv.registerTable(name, t)
     t
   }
 
@@ -151,27 +166,27 @@ case class BatchTableTestUtil() extends TableTestUtil {
       name: String,
       function: TableFunction[T])
     : TableFunction[T] = {
-    tEnv.registerFunction(name, function)
+    tableEnv.registerFunction(name, function)
     function
   }
 
   def addFunction(name: String, function: ScalarFunction): Unit = {
-    tEnv.registerFunction(name, function)
+    tableEnv.registerFunction(name, function)
   }
 
-  def addFunction[T:TypeInformation, ACC:TypeInformation](
+  def addFunction[T: TypeInformation, ACC: TypeInformation](
       name: String,
       function: AggregateFunction[T, ACC]): Unit = {
-    tEnv.registerFunction(name, function)
+    tableEnv.registerFunction(name, function)
   }
 
   def verifySql(query: String, expected: String): Unit = {
-    verifyTable(tEnv.sql(query), expected)
+    verifyTable(tableEnv.sql(query), expected)
   }
 
   def verifyTable(resultTable: Table, expected: String): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
+    val optimized = tableEnv.optimize(relNode)
     val actual = RelOptUtil.toString(optimized)
     assertEquals(
       expected.split("\n").map(_.trim).mkString("\n"),
@@ -180,22 +195,24 @@ case class BatchTableTestUtil() extends TableTestUtil {
 
   def printTable(resultTable: Table): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
+    val optimized = tableEnv.optimize(relNode)
     println(RelOptUtil.toString(optimized))
   }
 
   def printSql(query: String): Unit = {
-    printTable(tEnv.sql(query))
+    printTable(tableEnv.sql(query))
   }
+
 }
 
 case class StreamTableTestUtil() extends TableTestUtil {
 
   val javaEnv = mock(classOf[JStreamExecutionEnvironment])
   when(javaEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
+  val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
   val env = mock(classOf[StreamExecutionEnvironment])
   when(env.getWrappedStreamExecutionEnvironment).thenReturn(javaEnv)
-  val tEnv = TableEnvironment.getTableEnvironment(env)
+  val tableEnv = TableEnvironment.getTableEnvironment(env)
 
   def addTable[T: TypeInformation](
       name: String,
@@ -208,8 +225,18 @@ case class StreamTableTestUtil() extends TableTestUtil {
     val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
     when(jDs.getType).thenReturn(typeInfo)
 
-    val t = ds.toTable(tEnv, fields: _*)
-    tEnv.registerTable(name, t)
+    val t = ds.toTable(tableEnv, fields: _*)
+    tableEnv.registerTable(name, t)
+    t
+  }
+
+  def addJavaTable[T](typeInfo: TypeInformation[T], name: String, fields: String): Table
= {
+
+    val jDs = mock(classOf[JDataStream[T]])
+    when(jDs.getType).thenReturn(typeInfo)
+
+    val t = javaTableEnv.fromDataStream(jDs, fields)
+    javaTableEnv.registerTable(name, t)
     t
   }
 
@@ -217,27 +244,27 @@ case class StreamTableTestUtil() extends TableTestUtil {
       name: String,
       function: TableFunction[T])
     : TableFunction[T] = {
-    tEnv.registerFunction(name, function)
+    tableEnv.registerFunction(name, function)
     function
   }
 
   def addFunction(name: String, function: ScalarFunction): Unit = {
-    tEnv.registerFunction(name, function)
+    tableEnv.registerFunction(name, function)
   }
 
-  def addFunction[T:TypeInformation, ACC:TypeInformation](
+  def addFunction[T: TypeInformation, ACC: TypeInformation](
       name: String,
       function: AggregateFunction[T, ACC]): Unit = {
-    tEnv.registerFunction(name, function)
+    tableEnv.registerFunction(name, function)
   }
 
   def verifySql(query: String, expected: String): Unit = {
-    verifyTable(tEnv.sql(query), expected)
+    verifyTable(tableEnv.sql(query), expected)
   }
 
   def verifyTable(resultTable: Table, expected: String): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
+    val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
     val actual = RelOptUtil.toString(optimized)
     assertEquals(
       expected.split("\n").map(_.trim).mkString("\n"),
@@ -247,11 +274,12 @@ case class StreamTableTestUtil() extends TableTestUtil {
   // the print methods are for debugging purposes only
   def printTable(resultTable: Table): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
+    val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
     println(RelOptUtil.toString(optimized))
   }
 
   def printSql(query: String): Unit = {
-    printTable(tEnv.sql(query))
+    printTable(tableEnv.sql(query))
   }
+
 }


Mime
View raw message