flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-5280] [table] Refactor TableSource interface.
Date Tue, 10 Jan 2017 21:30:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master d4d7cc326 -> 2af939a10


[FLINK-5280] [table] Refactor TableSource interface.

This closes #3039.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ded2bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ded2bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ded2bb

Branch: refs/heads/master
Commit: 38ded2bb00aeb5c9581fa7ef313e5b9f803f5c26
Parents: d4d7cc3
Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
Authored: Thu Dec 22 21:26:34 2016 +0000
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Jan 10 20:45:10 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTableSource.java      |  21 +--
 .../flink/table/api/BatchTableEnvironment.scala |   2 +-
 .../table/api/StreamTableEnvironment.scala      |   2 +-
 .../flink/table/api/TableEnvironment.scala      | 135 ++++++++++++++-----
 .../utils/UserDefinedFunctionUtils.scala        |  22 +--
 .../flink/table/plan/nodes/FlinkRel.scala       |  11 +-
 .../nodes/dataset/BatchTableSourceScan.scala    |   8 +-
 .../datastream/StreamTableSourceScan.scala      |   8 +-
 .../dataSet/BatchTableSourceScanRule.scala      |   6 +-
 ...ushProjectIntoBatchTableSourceScanRule.scala |   3 +-
 ...shProjectIntoStreamTableSourceScanRule.scala |   3 +-
 .../datastream/StreamTableSourceScanRule.scala  |   6 +-
 .../flink/table/plan/schema/FlinkTable.scala    |   6 +-
 .../table/plan/schema/TableSourceTable.scala    |  13 +-
 .../flink/table/sources/CsvTableSource.scala    |  11 +-
 .../flink/table/sources/DefinedFieldNames.scala |  35 +++++
 .../flink/table/sources/TableSource.scala       |  21 +--
 .../table/api/java/batch/TableSourceITCase.java |  11 +-
 .../flink/table/TableEnvironmentTest.scala      |   7 +-
 .../api/scala/batch/TableSourceITCase.scala     |  19 +++
 .../flink/table/utils/CommonTestData.scala      |  42 +++++-
 .../flink/api/scala/io/CsvInputFormatTest.scala |   2 -
 22 files changed, 259 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 9a9c85d..dd32bdd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -19,12 +19,12 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Properties;
@@ -112,23 +112,8 @@ public abstract class KafkaTableSource implements StreamTableSource<Row>
{
 	}
 
 	@Override
-	public int getNumberOfFields() {
-		return fieldNames.length;
-	}
-
-	@Override
-	public String[] getFieldsNames() {
-		return fieldNames;
-	}
-
-	@Override
-	public TypeInformation<?>[] getFieldTypes() {
-		return fieldTypes;
-	}
-
-	@Override
 	public TypeInformation<Row> getReturnType() {
-		return new RowTypeInfo(fieldTypes);
+		return new RowTypeInfo(fieldTypes, fieldNames);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 59cad80..4b9936d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -298,7 +298,7 @@ abstract class BatchTableEnvironment(
     * @return The [[DataSet]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A]
= {
-    validateType(tpe)
+    TableEnvironment.validateType(tpe)
 
     logicalPlan match {
       case node: DataSetRel =>

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4e43001..c08b502 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -308,7 +308,7 @@ abstract class StreamTableEnvironment(
   protected def translate[A]
       (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
 
-    validateType(tpe)
+    TableEnvironment.validateType(tpe)
 
     logicalPlan match {
       case node: DataStreamRel =>

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1c899e1..2dcfc95 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.api
 
-import _root_.java.util.concurrent.atomic.AtomicInteger
 import _root_.java.lang.reflect.Modifier
+import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.config.Lex
 import org.apache.calcite.jdbc.CalciteSchema
@@ -32,7 +32,8 @@ import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
@@ -48,6 +49,7 @@ import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.schema.RelTable
 import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
 
 import _root_.scala.collection.JavaConverters._
@@ -336,48 +338,16 @@ abstract class TableEnvironment(val config: TableConfig) {
     frameworkConfig
   }
 
-  protected def validateType(typeInfo: TypeInformation[_]): Unit = {
-    val clazz = typeInfo.getTypeClass
-    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
-        !Modifier.isPublic(clazz.getModifiers) ||
-        clazz.getCanonicalName == null) {
-      throw TableException(s"Class '$clazz' described in type information '$typeInfo' must
be " +
-        s"static and globally accessible.")
-    }
-  }
-
   /**
     * Returns field names and field positions for a given [[TypeInformation]].
     *
-    * Field names are automatically extracted for
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    * The method fails if inputType is not a
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    *
     * @param inputType The TypeInformation extract the field names and positions from.
     * @tparam A The type of the TypeInformation.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-      (Array[String], Array[Int]) =
-  {
-    validateType(inputType)
-
-    val fieldNames: Array[String] = inputType match {
-      case t: TupleTypeInfo[A] => t.getFieldNames
-      case c: CaseClassTypeInfo[A] => c.getFieldNames
-      case p: PojoTypeInfo[A] => p.getFieldNames
-      case r: RowTypeInfo => r.getFieldNames
-      case tpe =>
-        throw new TableException(s"Type $tpe lacks explicit field naming")
-    }
-    val fieldIndexes = fieldNames.indices.toArray
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    (fieldNames, fieldIndexes)
+  (Array[String], Array[Int]) = {
+    (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType))
   }
 
   /**
@@ -393,7 +363,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     inputType: TypeInformation[A],
     exprs: Array[Expression]): (Array[String], Array[Int]) = {
 
-    validateType(inputType)
+    TableEnvironment.validateType(inputType)
 
     val indexedNames: Array[(Int, String)] = inputType match {
       case a: AtomicType[A] =>
@@ -554,4 +524,95 @@ object TableEnvironment {
 
     new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+    * Returns field names for a given [[TypeInformation]].
+    *
+    * @param inputType The TypeInformation extract the field names.
+    * @tparam A The type of the TypeInformation.
+    * @return An array holding the field names
+    */
+  def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = {
+    validateType(inputType)
+
+    val fieldNames: Array[String] = inputType match {
+      case t: CompositeType[_] => t.getFieldNames
+      case a: AtomicType[_] => Array("f0")
+      case tpe =>
+        throw new TableException(s"Currently only CompositeType and AtomicType are supported.
" +
+          s"Type $tpe lacks explicit field naming")
+    }
+
+    if (fieldNames.contains("*")) {
+      throw new TableException("Field name can not be '*'.")
+    }
+
+    fieldNames
+  }
+
+  /**
+    * Validate if class represented by the typeInfo is static and globally accessible
+    * @param typeInfo type to check
+    * @throws TableException if type does not meet these criteria
+    */
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+    val clazz = typeInfo.getTypeClass
+    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+      !Modifier.isPublic(clazz.getModifiers) ||
+      clazz.getCanonicalName == null) {
+      throw TableException(s"Class '$clazz' described in type information '$typeInfo' must
be " +
+        s"static and globally accessible.")
+    }
+  }
+
+  /**
+    * Returns field indexes for a given [[TypeInformation]].
+    *
+    * @param inputType The TypeInformation extract the field positions from.
+    * @return An array holding the field positions
+    */
+  def getFieldIndices(inputType: TypeInformation[_]): Array[Int] = {
+    getFieldNames(inputType).indices.toArray
+  }
+
+  /**
+    * Returns field types for a given [[TypeInformation]].
+    *
+    * @param inputType The TypeInformation to extract field types from.
+    * @return An array holding the field types.
+    */
+  def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
+    validateType(inputType)
+
+    inputType match {
+      case t: CompositeType[_] => 0.until(t.getArity).map(t.getTypeAt(_)).toArray
+      case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]])
+      case tpe =>
+        throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
+    }
+  }
+
+  /**
+    * Returns field names for a given [[TableSource]].
+    *
+    * @param tableSource The TableSource to extract field names from.
+    * @tparam A The type of the TableSource.
+    * @return An array holding the field names.
+    */
+  def getFieldNames[A](tableSource: TableSource[A]): Array[String] = tableSource match {
+      case d: DefinedFieldNames => d.getFieldNames
+      case _ => TableEnvironment.getFieldNames(tableSource.getReturnType)
+    }
+
+  /**
+    * Returns field indices for a given [[TableSource]].
+    *
+    * @param tableSource The TableSource to extract field indices from.
+    * @tparam A The type of the TableSource.
+    * @return An array holding the field indices.
+    */
+  def getFieldIndices[A](tableSource: TableSource[A]): Array[Int] = tableSource match {
+    case d: DefinedFieldNames => d.getFieldIndices
+    case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index aa3fab0..fa4668d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction}
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
@@ -268,23 +268,9 @@ object UserDefinedFunctionUtils {
   def getFieldInfo(inputType: TypeInformation[_])
     : (Array[String], Array[Int], Array[TypeInformation[_]]) = {
 
-    val fieldNames: Array[String] = inputType match {
-      case t: CompositeType[_] => t.getFieldNames
-      case a: AtomicType[_] => Array("f0")
-      case tpe =>
-        throw new TableException(s"Currently only CompositeType and AtomicType are supported.
" +
-          s"Type $tpe lacks explicit field naming")
-    }
-    val fieldIndexes = fieldNames.indices.toArray
-    val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i =>
-      inputType match {
-        case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]]
-        case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]]
-        case tpe =>
-          throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
-      }
-    }
-    (fieldNames, fieldIndexes, fieldTypes)
+    (TableEnvironment.getFieldNames(inputType),
+    TableEnvironment.getFieldIndices(inputType),
+    TableEnvironment.getFieldTypes(inputType))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index 835f316..9b844be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.calcite.rel.`type`.RelDataType
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
@@ -103,10 +105,12 @@ trait FlinkRel {
 
   }
 
+
   private[flink] def estimateRowSize(rowType: RelDataType): Double = {
+    val fieldList = rowType.getFieldList
 
-    rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
-      t match {
+    fieldList.map(_.getType.getSqlTypeName).zipWithIndex.foldLeft(0) { (s, t) =>
+      t._1 match {
         case SqlTypeName.TINYINT => s + 1
         case SqlTypeName.SMALLINT => s + 2
         case SqlTypeName.INTEGER => s + 4
@@ -120,6 +124,7 @@ trait FlinkRel {
         case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
         case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
         case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
+        case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(t._2).getType()).asInstanceOf[Int]
         case _ => throw TableException(s"Unsupported data type encountered: $t")
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 09cb180..73dddc6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.BatchTableSource
@@ -38,7 +38,9 @@ class BatchTableSourceScan(
 
   override def deriveRowType() = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+    flinkTypeFactory.buildRowDataType(
+      TableEnvironment.getFieldNames(tableSource),
+      TableEnvironment.getFieldTypes(tableSource.getReturnType))
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost
= {
@@ -57,7 +59,7 @@ class BatchTableSourceScan(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .item("fields", tableSource.getFieldsNames.mkString(", "))
+      .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
   }
 
   override def translateToPlan(

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 702b6eb..7550593 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
 
 /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]].
*/
 class StreamTableSourceScan(
@@ -38,7 +38,9 @@ class StreamTableSourceScan(
 
   override def deriveRowType() = {
     val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+    flinkTypeFactory.buildRowDataType(
+      TableEnvironment.getFieldNames(tableSource),
+      TableEnvironment.getFieldTypes(tableSource.getReturnType))
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost
= {
@@ -57,7 +59,7 @@ class StreamTableSourceScan(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .item("fields", tableSource.getFieldsNames.mkString(", "))
+      .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
   }
 
   override def translateToPlan(

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
index d699585..d9f5bf8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -39,9 +39,9 @@ class BatchTableSourceScanRule
   /** Rule must only match if TableScan targets a [[BatchTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
     dataSetTable match {
-      case tst: TableSourceTable =>
+      case tst: TableSourceTable[_] =>
         tst.tableSource match {
           case _: BatchTableSource[_] =>
             true
@@ -57,7 +57,7 @@ class BatchTableSourceScanRule
     val scan: TableScan = rel.asInstanceOf[TableScan]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
 
-    val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource
+    val tableSource = scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource
       .asInstanceOf[BatchTableSource[_]]
     new BatchTableSourceScan(
       rel.getCluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
index 7adec48..70639b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.rules.dataSet
 
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc}
 import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
 import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
@@ -47,7 +48,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule(
     val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram)
 
     // if no fields can be projected, we keep the original plan.
-    if (scan.tableSource.getNumberOfFields != usedFields.length) {
+    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
       val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
       val newTableSource = originTableSource.projectFields(usedFields)
       val newScan = new BatchTableSourceScan(

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
index 654fb8f..a6d4b82 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.rules.datastream
 
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan}
 import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._
 import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource}
@@ -48,7 +49,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule(
     val usedFields = extractRefInputFields(calc.calcProgram)
 
     // if no fields can be projected, we keep the original plan
-    if (scan.tableSource.getNumberOfFields != usedFields.length) {
+    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
       val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
       val newTableSource = originTableSource.projectFields(usedFields)
       val newScan = new StreamTableSourceScan(

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 296c86b..c6c7c59 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -40,9 +40,9 @@ class StreamTableSourceScanRule
   /** Rule must only match if TableScan targets a [[StreamTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
     dataSetTable match {
-      case tst: TableSourceTable =>
+      case tst: TableSourceTable[_] =>
         tst.tableSource match {
           case _: StreamTableSource[_] =>
             true
@@ -59,7 +59,7 @@ class StreamTableSourceScanRule
     val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
 
     // The original registered table source
-    val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])
+    val table = scan.getTable.unwrap(classOf[TableSourceTable[_]])
     val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]]
 
     new StreamTableSourceScan(

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index 8bb5c81..971f54f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 
 abstract class FlinkTable[T](
@@ -44,14 +44,14 @@ abstract class FlinkTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
     typeInfo match {
-      case cType: CompositeType[T] =>
+      case cType: CompositeType[_] =>
         if (fieldNames.length != cType.getArity) {
           throw new TableException(
           s"Arity of type (" + cType.getFieldNames.deep + ") " +
             "not equal to number of field names " + fieldNames.deep + ".")
         }
         fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
-      case aType: AtomicType[T] =>
+      case aType: AtomicType[_] =>
         if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
           throw new TableException(
             "Non-composite input type may have only a single field and its index must be
0.")

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index 0f55daf..4f82f5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -18,13 +18,12 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.sources.TableSource
-import org.apache.flink.types.Row
 
 /** Table which defines an external table via a [[TableSource]] */
-class TableSourceTable(val tableSource: TableSource[_])
-  extends FlinkTable[Row](
-    typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
-    fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
-    fieldNames = tableSource.getFieldsNames)
+class TableSourceTable[T](val tableSource: TableSource[T])
+  extends FlinkTable[T](
+    typeInfo = tableSource.getReturnType,
+    fieldIndexes = TableEnvironment.getFieldIndices(tableSource),
+    fieldNames = TableEnvironment.getFieldNames(tableSource))

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index 20e8bb9..f59a331 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -73,7 +73,7 @@ class CsvTableSource(
     throw TableException("Number of field names and field types must be equal.")
   }
 
-  private val returnType = new RowTypeInfo(fieldTypes: _*)
+  private val returnType = new RowTypeInfo(fieldTypes, fieldNames)
 
   private var selectedFields: Array[Int] = fieldTypes.indices.toArray
 
@@ -87,15 +87,6 @@ class CsvTableSource(
     execEnv.createInput(createCsvInput(), returnType)
   }
 
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = fieldNames
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = fieldNames.length
-
   /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
   override def getReturnType: RowTypeInfo = returnType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
new file mode 100644
index 0000000..bead3e9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+  * Trait that defines custom field names and their indices in the underlying
+  * data type.
+  *
+  * Should be extended together with [[TableSource]] trait.
+  */
+trait DefinedFieldNames {
+
+  /** Returns the names of the table fields. */
+  def getFieldNames: Array[String]
+
+  /** Returns the indices of the table fields. */
+  def getFieldIndices: Array[Int]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
index 9d4ba68..a3eb03d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -19,22 +19,23 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field names and types.
+/** Defines an external table by providing schema information and used to produce a
+  * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this
case
+  * field names and field indices are derived from the returned type.
+  *
+  * In case if custom field names are required one need to additionally implement
+  * the [[DefinedFieldNames]] trait.
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
-  /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
-
   /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
   def getReturnType: TypeInformation[T]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
index e5777f2..d67725e 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -18,17 +18,15 @@
 
 package org.apache.flink.table.api.java.batch;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.utils.CommonTestData;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.utils.CommonTestData;
+import org.apache.flink.types.Row;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -91,5 +89,4 @@ public class TableSourceITCase extends TableProgramsTestBase {
 
 		compareResultAsText(results, expected);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index b90de97..f91aee9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -68,9 +68,12 @@ class TableEnvironmentTest {
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
   }
 
-  @Test(expected = classOf[TableException])
+  @Test
   def testGetFieldInfoAtomic(): Unit = {
-    tEnv.getFieldInfo(atomicType)
+    val fieldInfo = tEnv.getFieldInfo(atomicType)
+
+    fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index a9218ac..f5ab352 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.utils.CommonTestData
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -86,4 +87,22 @@ class TableSourceITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testNestedBatchTableSourceSQL(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val nestedTable = CommonTestData.getNestedTableSource
+
+    tableEnv.registerTableSource("NestedPersons", nestedTable)
+
+    val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
+        "NestedPersons.address.street, NestedPersons.address.city AS city " +
+        "FROM NestedPersons " +
+        "WHERE NestedPersons.address.city LIKE 'Dublin'").collect()
+
+    val expected = "Bob,Taylor,Pearse Street,Dublin"
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
index 349b369..6e4859b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -19,9 +19,16 @@
 package org.apache.flink.table.utils
 
 import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.util
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor}
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource, TableSource}
+import org.apache.flink.api.scala._
 
 object CommonTestData {
 
@@ -60,4 +67,35 @@ object CommonTestData {
       ignoreComments = "%"
     )
   }
+
+  def getNestedTableSource: BatchTableSource[Person] = {
+    new BatchTableSource[Person] {
+      override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = {
+        val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+        executionEnvironment.fromCollection(
+          util.Arrays.asList(
+            new Person("Mike", "Smith", new Address("5th Ave", "New-York")),
+            new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")),
+            new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))),
+          getReturnType
+        )
+      }
+
+      override def getReturnType: TypeInformation[Person] = {
+        TypeExtractor.getForClass(classOf[Person])
+      }
+    }
+  }
+
+  class Person(var firstName: String, var lastName: String, var address: Address) {
+    def this() {
+      this(null, null, null)
+    }
+  }
+
+  class Address(var street: String, var city: String) {
+    def this() {
+      this(null, null)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 539a257..925ee78 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -19,8 +19,6 @@ package org.apache.flink.api.scala.io
 
 import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.io.PojoCsvInputFormat
 import org.apache.flink.api.java.io.TupleCsvInputFormat
 import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO


Mime
View raw message