flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] flink git commit: [FLINK-7802] [table] Fix projection of all fields in CsvTableSource.
Date Thu, 19 Oct 2017 15:23:31 GMT
[FLINK-7802] [table] Fix projection of all fields in CsvTableSource.

This closes #4815.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/024d8f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/024d8f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/024d8f57

Branch: refs/heads/release-1.3
Commit: 024d8f5779406d88f44fda51aa47c2bdbc63a226
Parents: 28b65f9
Author: godfreyhe <godfreyhe@163.com>
Authored: Thu Oct 12 19:12:47 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Oct 19 16:23:52 2017 +0200

----------------------------------------------------------------------
 .../flink/table/sources/CsvTableSource.scala    | 12 ++++++++---
 .../apache/flink/table/TableSourceTest.scala    | 21 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/024d8f57/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index 8a458ef..cfc8ada 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -105,8 +105,14 @@ class CsvTableSource(
   /** Returns a copy of [[TableSource]] with ability to project fields */
   override def projectFields(fields: Array[Int]): CsvTableSource = {
 
-    val newFieldNames: Array[String] = fields.map(fieldNames(_))
-    val newFieldTypes: Array[TypeInformation[_]] = fields.map(fieldTypes(_))
+    val (newFields, newFieldNames, newFieldTypes) = if (fields.nonEmpty) {
+      (fields, fields.map(fieldNames(_)), fields.map(fieldTypes(_)))
+    } else {
+      // reporting number of records only, we must read some columns to get row count.
+      // (e.g. SQL: select count(1) from csv_table)
+      // We choose the first column here.
+      (Array(0), Array(fieldNames.head), Array[TypeInformation[_]](fieldTypes.head))
+    }
 
     val source = new CsvTableSource(path,
       newFieldNames,
@@ -117,7 +123,7 @@ class CsvTableSource(
       ignoreFirstLine,
       ignoreComments,
       lenient)
-    source.selectedFields = fields
+    source.selectedFields = newFields
     source
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/024d8f57/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
index 24a32de..5404721 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
@@ -118,6 +118,27 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
+  def testBatchProjectableSourceFullProjection(): Unit = {
+    val (tableSource, tableName) = csvTable
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val result = tEnv
+      .scan(tableName)
+      .select(1)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchSourceTableNode(tableName, Array("first")),
+      term("select", "1 AS _c0")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
   def testBatchFilterableWithoutPushDown(): Unit = {
     val (tableSource, tableName) = filterableTableSource
     val util = batchTestUtil()


Mime
View raw message