[FLINK-7802] [table] Fix projection of all fields in CsvTableSource.
This closes #4815.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/024d8f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/024d8f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/024d8f57
Branch: refs/heads/release-1.3
Commit: 024d8f5779406d88f44fda51aa47c2bdbc63a226
Parents: 28b65f9
Author: godfreyhe <godfreyhe@163.com>
Authored: Thu Oct 12 19:12:47 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Oct 19 16:23:52 2017 +0200
----------------------------------------------------------------------
.../flink/table/sources/CsvTableSource.scala | 12 ++++++++---
.../apache/flink/table/TableSourceTest.scala | 21 ++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/024d8f57/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index 8a458ef..cfc8ada 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -105,8 +105,14 @@ class CsvTableSource(
/** Returns a copy of [[TableSource]] with ability to project fields */
override def projectFields(fields: Array[Int]): CsvTableSource = {
- val newFieldNames: Array[String] = fields.map(fieldNames(_))
- val newFieldTypes: Array[TypeInformation[_]] = fields.map(fieldTypes(_))
+ val (newFields, newFieldNames, newFieldTypes) = if (fields.nonEmpty) {
+ (fields, fields.map(fieldNames(_)), fields.map(fieldTypes(_)))
+ } else {
+ // reporting number of records only, we must read some columns to get row count.
+ // (e.g. SQL: select count(1) from csv_table)
+ // We choose the first column here.
+ (Array(0), Array(fieldNames.head), Array[TypeInformation[_]](fieldTypes.head))
+ }
val source = new CsvTableSource(path,
newFieldNames,
@@ -117,7 +123,7 @@ class CsvTableSource(
ignoreFirstLine,
ignoreComments,
lenient)
- source.selectedFields = fields
+ source.selectedFields = newFields
source
}
http://git-wip-us.apache.org/repos/asf/flink/blob/024d8f57/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
index 24a32de..5404721 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
@@ -118,6 +118,27 @@ class TableSourceTest extends TableTestBase {
}
@Test
+ def testBatchProjectableSourceFullProjection(): Unit = {
+ val (tableSource, tableName) = csvTable
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, tableSource)
+
+ val result = tEnv
+ .scan(tableName)
+ .select(1)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchSourceTableNode(tableName, Array("first")),
+ term("select", "1 AS _c0")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
def testBatchFilterableWithoutPushDown(): Unit = {
val (tableSource, tableName) = filterableTableSource
val util = batchTestUtil()
|