flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/4] flink git commit: [FLINK-7548] [table] Refactor TableSource interface.
Date Mon, 30 Oct 2017 13:20:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala
new file mode 100644
index 0000000..14e8cb1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil}
+
+class BatchTableSourceTable[T](
+    tableSource: BatchTableSource[T],
+    statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends TableSourceTable[T](
+    tableSource,
+    statistic) {
+
+  TableSourceUtil.validateTableSource(tableSource)
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    TableSourceUtil.getRelDataType(
+      tableSource,
+      None,
+      streaming = false,
+      typeFactory.asInstanceOf[FlinkTypeFactory])
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
index 0ce2a87..c1515b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
@@ -26,5 +26,4 @@ class DataSetTable[T](
     override val fieldIndexes: Array[Int],
     override val fieldNames: Array[String],
     override val statistic: FlinkStatistic = FlinkStatistic.of(TableStats(1000L)))
-  extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic) {
-}
+  extends InlineTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic)

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
index b7021e2..6de962c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -26,6 +26,4 @@ class DataStreamTable[T](
     override val fieldIndexes: Array[Int],
     override val fieldNames: Array[String],
     override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
-  extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) {
-
-}
+  extends InlineTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic)

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
deleted file mode 100644
index c76532f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.schema
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.Statistic
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
-
-abstract class FlinkTable[T](
-    val typeInfo: TypeInformation[T],
-    val fieldIndexes: Array[Int],
-    val fieldNames: Array[String],
-    val statistic: FlinkStatistic)
-  extends AbstractTable {
-
-  if (fieldIndexes.length != fieldNames.length) {
-    throw new TableException(
-      s"Number of field names and field indexes must be equal.\n" +
-        s"Number of names is ${fieldNames.length}, number of indexes is ${fieldIndexes.length}.\n" +
-        s"List of column names: ${fieldNames.mkString("[", ", ", "]")}.\n" +
-        s"List of column indexes: ${fieldIndexes.mkString("[", ", ", "]")}.")
-  }
-
-  // check uniqueness of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-    val duplicateFields = fieldNames
-      // count occurences of field names
-      .groupBy(identity).mapValues(_.length)
-      // filter for occurences > 1 and map to field name
-      .filter(g => g._2 > 1).keys
-
-    throw new TableException(
-      s"Field names must be unique.\n" +
-        s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" +
-        s"List of all fields: ${fieldNames.mkString("[", ", ", "]")}.")
-  }
-
-  val fieldTypes: Array[TypeInformation[_]] =
-    typeInfo match {
-      case cType: CompositeType[_] =>
-        // it is ok to leave out fields
-        if (fieldIndexes.count(_ >= 0) > cType.getArity) {
-          throw new TableException(
-          s"Arity of type (" + cType.getFieldNames.deep + ") " +
-            "must not be greater than number of field names " + fieldNames.deep + ".")
-        }
-        fieldIndexes.map {
-          case TimeIndicatorTypeInfo.ROWTIME_MARKER => TimeIndicatorTypeInfo.ROWTIME_INDICATOR
-          case TimeIndicatorTypeInfo.PROCTIME_MARKER => TimeIndicatorTypeInfo.PROCTIME_INDICATOR
-          case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
-      case aType: AtomicType[_] =>
-        if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
-          throw new TableException(
-            "Non-composite input type may have only a single field and its index must be 0.")
-        }
-        Array(aType)
-    }
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes)
-  }
-
-  /**
-    * Returns statistics of current table
-    *
-    * @return statistics of current table
-    */
-  override def getStatistic: Statistic = statistic
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
new file mode 100644
index 0000000..22d6151
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.{TableException, Types}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+abstract class InlineTable[T](
+    val typeInfo: TypeInformation[T],
+    val fieldIndexes: Array[Int],
+    val fieldNames: Array[String],
+    val statistic: FlinkStatistic)
+  extends AbstractTable {
+
+  if (fieldIndexes.length != fieldNames.length) {
+    throw new TableException(
+      s"Number of field names and field indexes must be equal.\n" +
+        s"Number of names is ${fieldNames.length}, number of indexes is ${fieldIndexes.length}.\n" +
+        s"List of column names: ${fieldNames.mkString("[", ", ", "]")}.\n" +
+        s"List of column indexes: ${fieldIndexes.mkString("[", ", ", "]")}.")
+  }
+
+  // check uniqueness of field names
+  if (fieldNames.length != fieldNames.toSet.size) {
+    val duplicateFields = fieldNames
+      // count occurences of field names
+      .groupBy(identity).mapValues(_.length)
+      // filter for occurences > 1 and map to field name
+      .filter(g => g._2 > 1).keys
+
+    throw new TableException(
+      s"Field names must be unique.\n" +
+        s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" +
+        s"List of all fields: ${fieldNames.mkString("[", ", ", "]")}.")
+  }
+
+  val fieldTypes: Array[TypeInformation[_]] =
+    typeInfo match {
+      case cType: CompositeType[_] =>
+        // it is ok to leave out fields
+        if (fieldIndexes.count(_ >= 0) > cType.getArity) {
+          throw new TableException(
+          s"Arity of type (" + cType.getFieldNames.deep + ") " +
+            "must not be greater than number of field names " + fieldNames.deep + ".")
+        }
+        fieldIndexes.map {
+          case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
+            TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+          case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
+            TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+          case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
+            Types.SQL_TIMESTAMP
+          case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
+            Types.SQL_TIMESTAMP
+          case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
+      case aType: AtomicType[_] =>
+        var cnt = 0
+        val types = fieldIndexes.map {
+          case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
+            TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+          case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
+            TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+          case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
+            Types.SQL_TIMESTAMP
+          case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
+            Types.SQL_TIMESTAMP
+          case _ =>
+            cnt += 1
+            aType.asInstanceOf[TypeInformation[_]]
+        }
+        // ensure that the atomic type is matched at most once.
+        if (cnt > 1) {
+          throw new TableException(
+            "Non-composite input type may have only a single field and its index must be 0.")
+        } else {
+          types
+        }
+    }
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes)
+  }
+
+  /**
+    * Returns statistics of current table
+    *
+    * @return statistics of current table
+    */
+  override def getStatistic: Statistic = statistic
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
index e94b4f2..9e82313 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -19,139 +19,24 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource, TableSource}
-import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import org.apache.flink.table.sources.{StreamTableSource, TableSourceUtil}
 
 class StreamTableSourceTable[T](
-    override val tableSource: TableSource[T],
-    override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+    tableSource: StreamTableSource[T],
+    statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends TableSourceTable[T](
     tableSource,
-    StreamTableSourceTable.adjustFieldIndexes(tableSource),
-    StreamTableSourceTable.adjustFieldNames(tableSource),
     statistic) {
 
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource)
-
-    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildLogicalRowType(
-      this.fieldNames,
-      fieldTypes)
-  }
-
-}
-
-object StreamTableSourceTable {
-
-  private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = {
-    val (_, proctime) = getTimeIndicators(tableSource)
-
-    val original = TableEnvironment.getFieldIndices(tableSource)
-
-    // append proctime marker
-    if (proctime.isDefined) {
-      original :+ TimeIndicatorTypeInfo.PROCTIME_MARKER
-    } else {
-      original
-    }
-  }
-
-  private def adjustFieldNames(tableSource: TableSource[_]): Array[String] = {
-    val (_, proctime) = getTimeIndicators(tableSource)
-
-    val original = TableEnvironment.getFieldNames(tableSource)
-
-    // append proctime field
-    if (proctime.isDefined) {
-      original :+ proctime.get
-    } else {
-      original
-    }
-  }
-
-  private def adjustFieldTypes(tableSource: TableSource[_]): Array[TypeInformation[_]] = {
-    val (rowtime, proctime) = StreamTableSourceTable.getTimeIndicators(tableSource)
+  TableSourceUtil.validateTableSource(tableSource)
 
-    val original = TableEnvironment.getFieldTypes(tableSource.getReturnType)
-
-    // update rowtime type
-    val withRowtime = if (rowtime.isDefined) {
-      // replace field type by RowtimeIndicator type
-      val rowtimeIdx = TableEnvironment.getFieldNames(tableSource).indexOf(rowtime.get)
-      original.patch(rowtimeIdx, Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1)
-    } else {
-      original
-    }
-
-    // append proctime type
-    val withProctime = if (proctime.isDefined) {
-      withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR
-    } else {
-      withRowtime
-    }
-
-    withProctime.asInstanceOf[Array[TypeInformation[_]]]
-  }
-
-  private def getTimeIndicators(tableSource: TableSource[_]): (Option[String], Option[String]) = {
-
-    val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-    val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
-
-    val rowtime: Option[String] = tableSource match {
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute == null =>
-        None
-      case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute.trim.equals("") =>
-        throw TableException("The name of the rowtime attribute must not be empty.")
-
-      case timeSource: DefinedRowtimeAttribute =>
-        // validate the rowtime field exists and is of type Long or Timestamp
-        val rowtimeAttribute = timeSource.getRowtimeAttribute
-        val rowtimeIdx = fieldNames.indexOf(rowtimeAttribute)
-
-        if (rowtimeIdx < 0) {
-          throw TableException(
-            s"Rowtime field '$rowtimeAttribute' is not present in TableSource. " +
-            s"Available fields are ${fieldNames.mkString("[", ", ", "]") }.")
-        }
-        val fieldType = fieldTypes(rowtimeIdx)
-        if (fieldType != Types.LONG && fieldType != Types.SQL_TIMESTAMP) {
-          throw TableException(
-            s"Rowtime field '$rowtimeAttribute' must be of type Long or Timestamp " +
-            s"but of type ${fieldTypes(rowtimeIdx)}.")
-        }
-        Some(rowtimeAttribute)
-      case _ =>
-        None
-    }
-
-    val proctime: Option[String] = tableSource match {
-      case timeSource : DefinedProctimeAttribute if timeSource.getProctimeAttribute == null =>
-        None
-      case timeSource: DefinedProctimeAttribute
-        if timeSource.getProctimeAttribute.trim.equals("") =>
-        throw TableException("The name of the rowtime attribute must not be empty.")
-      case timeSource: DefinedProctimeAttribute =>
-        val proctimeAttribute = timeSource.getProctimeAttribute
-        Some(proctimeAttribute)
-      case _ =>
-        None
-    }
-    (rowtime, proctime)
-  }
-
-  def deriveRowTypeOfTableSource(
-    tableSource: StreamTableSource[_],
-    typeFactory: FlinkTypeFactory): RelDataType = {
-
-    val fieldNames = adjustFieldNames(tableSource)
-    val fieldTypes = adjustFieldTypes(tableSource)
-
-    typeFactory.buildLogicalRowType(fieldNames, fieldTypes)
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    TableSourceUtil.getRelDataType(
+      tableSource,
+      None,
+      streaming = true,
+      typeFactory.asInstanceOf[FlinkTypeFactory])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index 2f0ba1a..048e862 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -18,30 +18,22 @@
 
 package org.apache.flink.table.plan.schema
 
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.TableSource
 
 /** Table which defines an external table via a [[TableSource]] */
-class TableSourceTable[T](
+abstract class TableSourceTable[T](
     val tableSource: TableSource[T],
-    fieldIndexes: Array[Int],
-    fieldNames: Array[String],
-    override val statistic: FlinkStatistic)
-  extends FlinkTable[T](
-    typeInfo = tableSource.getReturnType,
-    fieldIndexes,
-    fieldNames,
-    statistic) {
+    val statistic: FlinkStatistic)
+  extends AbstractTable {
 
-  def this(
-    tableSource: TableSource[T],
-    statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) {
+  /**
+    * Returns statistics of current table
+    *
+    * @return statistics of current table
+    */
+  override def getStatistic: Statistic = statistic
 
-    this(
-      tableSource,
-      TableEnvironment.getFieldIndices(tableSource),
-      TableEnvironment.getFieldNames(tableSource),
-      statistic)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index a524816..d5a5f36 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -25,9 +25,11 @@ import java.util.{Collections, List}
 import org.apache.calcite.rel.{RelCollation, RelDistribution, RelReferentialConstraint}
 import org.apache.calcite.schema.Statistic
 import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.plan.schema.InlineTable
 
 /**
-  * The class provides statistics for a [[org.apache.flink.table.plan.schema.FlinkTable]].
+  * The class provides statistics for a [[InlineTable]] or [[TableSourceTable]].
   *
   * @param tableStats The table statistics.
   */

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index cfc8ada..c443a69 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, TableSchema}
 
 import scala.collection.mutable
 
@@ -38,6 +38,8 @@ import scala.collection.mutable
   * @param path The path to the CSV file.
   * @param fieldNames The names of the table fields.
   * @param fieldTypes The types of the table fields.
+  * @param selectedFields The fields which will be read and returned by the table source.
+  *                       If None, all fields are returned.
   * @param fieldDelim The field delimiter, "," by default.
   * @param rowDelim The row delimiter, "\n" by default.
   * @param quoteCharacter An optional quote character for String values, null by default.
@@ -45,16 +47,17 @@ import scala.collection.mutable
   * @param ignoreComments An optional prefix to indicate comments, null by default.
   * @param lenient Flag to skip records with parse error instead to fail, false by default.
   */
-class CsvTableSource(
+class CsvTableSource private (
     private val path: String,
     private val fieldNames: Array[String],
     private val fieldTypes: Array[TypeInformation[_]],
-    private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-    private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-    private val quoteCharacter: Character = null,
-    private val ignoreFirstLine: Boolean = false,
-    private val ignoreComments: String = null,
-    private val lenient: Boolean = false)
+    private val selectedFields: Array[Int],
+    private val fieldDelim: String,
+    private val rowDelim: String,
+    private val quoteCharacter: Character,
+    private val ignoreFirstLine: Boolean,
+    private val ignoreComments: String,
+    private val lenient: Boolean)
   extends BatchTableSource[Row]
   with StreamTableSource[Row]
   with ProjectableTableSource[Row] {
@@ -66,18 +69,59 @@ class CsvTableSource(
     * @param path The path to the CSV file.
     * @param fieldNames The names of the table fields.
     * @param fieldTypes The types of the table fields.
+    * @param fieldDelim The field delimiter, "," by default.
+    * @param rowDelim The row delimiter, "\n" by default.
+    * @param quoteCharacter An optional quote character for String values, null by default.
+    * @param ignoreFirstLine Flag to ignore the first line, false by default.
+    * @param ignoreComments An optional prefix to indicate comments, null by default.
+    * @param lenient Flag to skip records with parse error instead to fail, false by default.
     */
-  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
+  def this(
+    path: String,
+    fieldNames: Array[String],
+    fieldTypes: Array[TypeInformation[_]],
+    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
+    quoteCharacter: Character = null,
+    ignoreFirstLine: Boolean = false,
+    ignoreComments: String = null,
+    lenient: Boolean = false) = {
+
+    this(
+      path,
+      fieldNames,
+      fieldTypes,
+      fieldTypes.indices.toArray, // initially, all fields are returned
+      fieldDelim,
+      rowDelim,
+      quoteCharacter,
+      ignoreFirstLine,
+      ignoreComments,
+      lenient)
+
+  }
+
+  /**
+    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
+    * (logically) unlimited number of fields.
+    *
+    * @param path The path to the CSV file.
+    * @param fieldNames The names of the table fields.
+    * @param fieldTypes The types of the table fields.
+    */
+  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = {
     this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
       CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
+  }
 
   if (fieldNames.length != fieldTypes.length) {
     throw TableException("Number of field names and field types must be equal.")
   }
 
-  private val returnType = new RowTypeInfo(fieldTypes, fieldNames)
+  private val selectedFieldTypes = selectedFields.map(fieldTypes(_))
+  private val selectedFieldNames = selectedFields.map(fieldNames(_))
 
-  private var selectedFields: Array[Int] = fieldTypes.indices.toArray
+  private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames)
 
   /**
     * Returns the data of the table as a [[DataSet]] of [[Row]].
@@ -102,35 +146,32 @@ class CsvTableSource(
     streamExecEnv.createInput(createCsvInput(), returnType)
   }
 
+  /** Returns the schema of the produced table. */
+  override def getTableSchema = new TableSchema(fieldNames, fieldTypes)
+
   /** Returns a copy of [[TableSource]] with ability to project fields */
   override def projectFields(fields: Array[Int]): CsvTableSource = {
 
-    val (newFields, newFieldNames, newFieldTypes) = if (fields.nonEmpty) {
-      (fields, fields.map(fieldNames(_)), fields.map(fieldTypes(_)))
-    } else {
-      // reporting number of records only, we must read some columns to get row count.
-      // (e.g. SQL: select count(1) from csv_table)
-      // We choose the first column here.
-      (Array(0), Array(fieldNames.head), Array[TypeInformation[_]](fieldTypes.head))
-    }
+    val selectedFields = if (fields.isEmpty) Array(0) else fields
+//    val selectedFiels = fields
 
-    val source = new CsvTableSource(path,
-      newFieldNames,
-      newFieldTypes,
+    new CsvTableSource(
+      path,
+      fieldNames,
+      fieldTypes,
+      selectedFields,
       fieldDelim,
       rowDelim,
       quoteCharacter,
       ignoreFirstLine,
       ignoreComments,
       lenient)
-    source.selectedFields = newFields
-    source
   }
 
   private def createCsvInput(): RowCsvInputFormat = {
     val inputFormat = new RowCsvInputFormat(
       new Path(path),
-      fieldTypes,
+      selectedFieldTypes,
       rowDelim,
       fieldDelim,
       selectedFields)
@@ -162,6 +203,11 @@ class CsvTableSource(
   override def hashCode(): Int = {
     returnType.hashCode()
   }
+
+  override def explainSource(): String = {
+    s"CsvTableSource(" +
+      s"read fields: ${getReturnType.getFieldNames.mkString(", ")})"
+  }
 }
 
 object CsvTableSource {

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
new file mode 100644
index 0000000..6a2ccc9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{Map => JMap}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+
+/**
+  * The [[DefinedFieldMapping]] interface provides a mapping for the fields of the table schema
+  * ([[TableSource.getTableSchema]] to fields of the physical returned type
+  * [[TableSource.getReturnType]] of a [[TableSource]].
+  *
+  * If a [[TableSource]] does not implement the [[DefinedFieldMapping]] interface, the fields of
+  * its [[TableSchema]] are mapped to the fields of its return type [[TypeInformation]] by name.
+  *
+  * If the fields cannot or should not be implicitly mapped by name, an explicit mapping can be
+  * provided by implementing this interface.
+  * If a mapping is provided, all fields must be explicitly mapped.
+  */
+trait DefinedFieldMapping {
+
+  /**
+    * Returns the mapping for the fields of the [[TableSource]]'s [[TableSchema]] to the fields of
+    * its return type [[TypeInformation]].
+    *
+    * The mapping is done based on field names, e.g., a mapping "name" -> "f1" maps the schema field
+    * "name" to the field "f1" of the return type, for example in this case the second field of a
+    * [[org.apache.flink.api.java.tuple.Tuple]].
+    *
+    * The returned mapping must map all fields (except proctime and rowtime fields) to the return
+    * type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make
+    * fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]].
+    *
+    * @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields.
+    */
+  def getFieldMapping: JMap[String, String]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
deleted file mode 100644
index bead3e9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-/**
-  * Trait that defines custom field names and their indices in the underlying
-  * data type.
-  *
-  * Should be extended together with [[TableSource]] trait.
-  */
-trait DefinedFieldNames {
-
-  /** Returns the names of the table fields. */
-  def getFieldNames: Array[String]
-
-  /** Returns the indices of the table fields. */
-  def getFieldIndices: Array[Int]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
new file mode 100644
index 0000000..9728763
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference}
+
+/**
+  * The [[FieldComputer]] interface returns an expression to compute the field of the table schema
+  * of a [[TableSource]] from one or more fields of the [[TableSource]]'s return type.
+  *
+  * @tparam T The result type of the provided expression.
+  */
+abstract class FieldComputer[T] {
+
+  /**
+    * Returns the names of all fields that the expression of the field computer accesses.
+    *
+    * @return An array with the names of all accessed fields.
+    */
+  def getArgumentFields: Array[String]
+
+  /**
+    * Returns the result type of the expression.
+    *
+    * @return The result type of the expression.
+    */
+  def getReturnType: TypeInformation[T]
+
+  /**
+    * Validates that the fields that the expression references have the correct types.
+    *
+    * @param argumentFieldTypes The types of the physical input fields.
+    */
+  @throws[ValidationException]
+  def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit
+
+  /**
+    * Returns the [[Expression]] that computes the value of the field.
+    *
+    * @param fieldAccesses Field access expressions for the argument fields.
+    * @return The expression to extract the timestamp from the [[TableSource]] return type.
+    */
+  def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
index a10187b..d0c7fdc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
@@ -18,33 +18,59 @@
 
 package org.apache.flink.table.sources
 
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.streaming.api.datastream.DataStream
+
 /**
   * Adds support for projection push-down to a [[TableSource]] with nested fields.
-  * A [[TableSource]] extending this interface is able
-  * to project the nested fields of the returned table.
+  * A [[TableSource]] extending this interface is able to project the fields of the returned
+  * [[DataSet]] if it is a [[BatchTableSource]] or [[DataStream]] if it is a [[StreamTableSource]].
   *
-  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
+  * @tparam T The return type of the [[TableSource]].
   */
 trait NestedFieldsProjectableTableSource[T] {
 
   /**
-    * Creates a copy of the [[TableSource]] that projects its output on the specified nested fields.
+    * Creates a copy of the [[TableSource]] that projects its output to the given field indexes.
+    * The field indexes relate to the physical return type ([[TableSource.getReturnType]]) and not
+    * to the table schema ([[TableSource.getTableSchema]] of the [[TableSource]].
     *
-    * @param fields The indexes of the fields to return.
-    * @param nestedFields The accessed nested fields of the fields to return.
+    * The table schema ([[TableSource.getTableSchema]] of the [[TableSource]] copy must not be
+    * modified by this method, but only the return type ([[TableSource.getReturnType]]) and the
+    * produced [[DataSet]] ([[BatchTableSource.getDataSet(]]) or [[DataStream]]
+    * ([[StreamTableSource.getDataStream]]). The return type may only be changed by
+    * removing or reordering first level fields. The type of the first level fields must not be
+    * changed.
+    *
+    * If the [[TableSource]] implements the [[DefinedFieldMapping]] interface, it might
+    * be necessary to adjust the mapping as well.
+    *
+    * The nestedFields parameter contains all nested fields that are accessed by the query.
+    * This information can be used to only read and set the accessed fields.
+    * Non-accessed fields may be left empty, set to null, or to a default value.
     *
-    * e.g.
+    * The [[projectNestedFields()]] method is called with parameters as shown in the example below:
+    *
+    * // schema
     * tableSchema = {
     *       id,
     *       student<\school<\city, tuition>, age, name>,
     *       teacher<\age, name>
     *       }
     *
+    * // query
     * select (id, student.school.city, student.age, teacher)
     *
+    * // parameters
     * fields = field = [0, 1, 2]
     * nestedFields  \[\["*"], ["school.city", "age"], ["*"\]\]
     *
+    * IMPORTANT: This method must return a true copy and must not modify the original table source
+    * object.
+    *
+    * @param fields The indexes of the fields to return.
+    * @param nestedFields The paths of all nested fields which are accessed by the query. All other
+    *                     nested fields may be empty.
     * @return A copy of the [[TableSource]] that projects its output.
     */
   def projectNestedFields(

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
index 570bdff..abb9970 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
@@ -18,16 +18,33 @@
 
 package org.apache.flink.table.sources
 
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.streaming.api.datastream.DataStream
+
 /**
   * Adds support for projection push-down to a [[TableSource]].
-  * A [[TableSource]] extending this interface is able to project the fields of the return table.
+  * A [[TableSource]] extending this interface is able to project the fields of the returned
+  * [[DataSet]] if it is a [[BatchTableSource]] or [[DataStream]] if it is a [[StreamTableSource]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait ProjectableTableSource[T] {
 
   /**
-    * Creates a copy of the [[TableSource]] that projects its output on the specified fields.
+    * Creates a copy of the [[TableSource]] that projects its output to the given field indexes.
+    * The field indexes relate to the physical return type ([[TableSource.getReturnType]]) and not
+    * to the table schema ([[TableSource.getTableSchema]] of the [[TableSource]].
+    *
+    * The table schema ([[TableSource.getTableSchema]] of the [[TableSource]] copy must not be
+    * modified by this method, but only the return type ([[TableSource.getReturnType]]) and the
+    * produced [[DataSet]] ([[BatchTableSource.getDataSet(]]) or [[DataStream]]
+    * ([[StreamTableSource.getDataStream]]).
+    *
+    * If the [[TableSource]] implements the [[DefinedFieldMapping]] interface, it might
+    * be necessary to adjust the mapping as well.
+    *
+    * IMPORTANT: This method must return a true copy and must not modify the original table source
+    * object.
     *
     * @param fields The indexes of the fields to return.
     * @return A copy of the [[TableSource]] that projects its output.

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
index d9ebc5a..b082a53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -18,26 +18,46 @@
 
 package org.apache.flink.table.sources
 
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
 
-/** Defines an external table by providing schema information and used to produce a
-  * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]].
-  * Schema information consists of a data type, field names, and corresponding indices of
-  * these names in the data type.
+/**
+  * Defines an external table with the schema that is provided by [[TableSource#getTableSchema]].
   *
-  * To define a TableSource one needs to implement [[TableSource#getReturnType]]. In this case
-  * field names and field indices are derived from the returned type.
+  * The data of a [[TableSource]] is produced as a [[DataSet]] in case of a [[BatchTableSource]] or
+  * as a [[DataStream]] in case of a [[StreamTableSource]].
+  * The type of ths produced [[DataSet]] or [[DataStream]] is specified by the
+  * [[TableSource#getReturnType]] method.
   *
-  * In case if custom field names are required one need to additionally implement
-  * the [[DefinedFieldNames]] trait.
+  * By default, the fields of the [[TableSchema]] are implicitly mapped by name to the fields of the
+  * return type [[TypeInformation]]. An explicit mapping can be defined by implementing the
+  * [[DefinedFieldMapping]] interface.
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
+  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]].
+    * The fields of the return type are mapped to the table schema based on their name.
+    *
+    * @return The type of the returned [[DataSet]] or [[DataStream]].
+    */
   def getReturnType: TypeInformation[T]
 
-  /** Describes the table source */
+  /**
+    * Returns the schema of the produced table.
+    *
+    * @return The [[TableSchema]] of the produced table.
+    */
+  def getTableSchema: TableSchema
+
+  /**
+    * Describes the table source
+    *
+    * @return A String explaining the [[TableSource]].
+    */
   def explainSource(): String = ""
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
new file mode 100644
index 0000000..48ab3de
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.sql.Timestamp
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.{TableException, Types, ValidationException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+
+import scala.collection.JavaConverters._
+
+/** Util class for [[TableSource]]. */
+object TableSourceUtil {
+
+  /** Returns true if the [[TableSource]] has a rowtime attribute. */
+  def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean =
+    getRowtimeAttributes(tableSource).nonEmpty
+
+  /** Returns true if the [[TableSource]] has a proctime attribute. */
+  def hasProctimeAttribute(tableSource: TableSource[_]): Boolean =
+    getProctimeAttribute(tableSource).nonEmpty
+
+  /**
+    * Validates a TableSource.
+    *
+    * - checks that all fields of the schema can be resolved
+    * - checks that resolved fields have the correct type
+    * - checks that the time attributes are correctly configured.
+    *
+    * @param tableSource The [[TableSource]] for which the time attributes are checked.
+    */
+  def validateTableSource(tableSource: TableSource[_]): Unit = {
+
+    val schema = tableSource.getTableSchema
+    val tableFieldNames = schema.getColumnNames
+    val tableFieldTypes = schema.getTypes
+
+    // get rowtime and proctime attributes
+    val rowtimeAttributes = getRowtimeAttributes(tableSource)
+    val proctimeAttribute = getProctimeAttribute(tableSource)
+
+    // validate that schema fields can be resolved to a return type field of correct type
+    var mappedFieldCnt = 0
+    tableFieldTypes.zip(tableFieldNames).foreach {
+      case (t: SqlTimeTypeInfo[_], name: String)
+        if t.getTypeClass == classOf[Timestamp] && proctimeAttribute.contains(name) =>
+        // OK, field was mapped to proctime attribute
+      case (t: SqlTimeTypeInfo[_], name: String)
+        if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) =>
+        // OK, field was mapped to rowtime attribute
+      case (t: TypeInformation[_], name) =>
+        // check if field is registered as time indicator
+        if (getProctimeAttribute(tableSource).contains(name)) {
+          throw new ValidationException(s"Processing time field '$name' has invalid type $t. " +
+            s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.")
+        }
+        if (getRowtimeAttributes(tableSource).contains(name)) {
+          throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " +
+            s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+        }
+        // check that field can be resolved in input type
+        val (physicalName, _, tpe) = resolveInputField(name, tableSource)
+        // validate that mapped fields are are same type
+        if (tpe != t) {
+          throw ValidationException(s"Type $t of table field '$name' does not " +
+            s"match with type $tpe of the field '$physicalName' of the TableSource return type.")
+        }
+        mappedFieldCnt += 1
+    }
+    // ensure that only one field is mapped to an atomic type
+    if (tableSource.getReturnType.isInstanceOf[AtomicType[_]] && mappedFieldCnt > 1) {
+      throw ValidationException(
+        s"More than one table field matched to atomic input type ${tableSource.getReturnType}.")
+    }
+
+    // validate rowtime attributes
+    tableSource match {
+      case r: DefinedRowtimeAttributes =>
+        val descriptors = r.getRowtimeAttributeDescriptors
+        if (descriptors.size() > 1) {
+          throw ValidationException("Currently, only a single rowtime attribute is supported. " +
+            s"Please remove all but one RowtimeAttributeDescriptor.")
+        } else if (descriptors.size() == 1) {
+          val descriptor = descriptors.get(0)
+          val rowtimeAttribute = descriptor.getAttributeName
+          val rowtimeIdx = schema.getColumnNames.indexOf(rowtimeAttribute)
+          // ensure that field exists
+          if (rowtimeIdx < 0) {
+            throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " +
+              s"'$rowtimeAttribute' but field '$rowtimeAttribute' does not exist in table.")
+          }
+          // ensure that field is of type TIMESTAMP
+          if (schema.getTypes(rowtimeIdx) != Types.SQL_TIMESTAMP) {
+            throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " +
+              s"'$rowtimeAttribute' but field '$rowtimeAttribute' is not of type TIMESTAMP.")
+          }
+          // look up extractor input fields in return type
+          val extractorInputFields = descriptor.getTimestampExtractor.getArgumentFields
+          val physicalTypes = resolveInputFields(extractorInputFields, tableSource).map(_._3)
+          // validate timestamp extractor
+          descriptor.getTimestampExtractor.validateArgumentFields(physicalTypes)
+        }
+      case _ => // nothing to validate
+    }
+
+    // validate proctime attribute
+    tableSource match {
+      case p: DefinedProctimeAttribute if p.getProctimeAttribute != null =>
+        val proctimeAttribute = p.getProctimeAttribute
+        val proctimeIdx = schema.getColumnNames.indexOf(proctimeAttribute)
+        // ensure that field exists
+        if (proctimeIdx < 0) {
+          throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " +
+            s"'$proctimeAttribute' but field '$proctimeAttribute' does not exist in table.")
+        }
+        // ensure that field is of type TIMESTAMP
+        if (schema.getTypes(proctimeIdx) != Types.SQL_TIMESTAMP) {
+          throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " +
+            s"'$proctimeAttribute' but field '$proctimeAttribute' is not of type TIMESTAMP.")
+        }
+      case _ => // nothing to validate
+    }
+
+    // ensure that proctime and rowtime attribute do not overlap
+    if (proctimeAttribute.isDefined && rowtimeAttributes.contains(proctimeAttribute.get)) {
+      throw new ValidationException(s"Field '${proctimeAttribute.get}' must not be " +
+        s"processing time and rowtime attribute at the same time.")
+    }
+  }
+
+  /**
+    * Computes the indices that map the input type of the DataStream to the schema of the table.
+    *
+    * The mapping is based on the field names and fails if a table field cannot be
+    * mapped to a field of the input type.
+    *
+    * @param tableSource The table source for which the table schema is mapped to the input type.
+    * @param isStreamTable True if the mapping is computed for a streaming table, false otherwise.
+    * @param selectedFields The indexes of the table schema fields for which a mapping is
+    *                       computed. If None, a mapping for all fields is computed.
+    * @return An index mapping from input type to table schema.
+    */
+  def computeIndexMapping(
+      tableSource: TableSource[_],
+      isStreamTable: Boolean,
+      selectedFields: Option[Array[Int]]): Array[Int] = {
+    val inputType = tableSource.getReturnType
+    val tableSchema = tableSource.getTableSchema
+
+    // get names of selected fields
+    val tableFieldNames = if (selectedFields.isDefined) {
+      val names = tableSchema.getColumnNames
+      selectedFields.get.map(names(_))
+    } else {
+      tableSchema.getColumnNames
+    }
+
+    // get types of selected fields
+    val tableFieldTypes = if (selectedFields.isDefined) {
+      val types = tableSchema.getTypes
+      selectedFields.get.map(types(_))
+    } else {
+      tableSchema.getTypes
+    }
+
+    // get rowtime and proctime attributes
+    val rowtimeAttributes = getRowtimeAttributes(tableSource)
+    val proctimeAttributes = getProctimeAttribute(tableSource)
+
+    // compute mapping of selected fields and time attributes
+    val mapping: Array[Int] = tableFieldTypes.zip(tableFieldNames).map {
+      case (t: SqlTimeTypeInfo[_], name: String)
+        if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) =>
+        if (isStreamTable) {
+          TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER
+        } else {
+          TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER
+        }
+      case (t: SqlTimeTypeInfo[_], name: String)
+        if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) =>
+        if (isStreamTable) {
+          TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER
+        } else {
+          TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER
+        }
+      case (t: TypeInformation[_], name) =>
+        // check if field is registered as time indicator
+        if (getProctimeAttribute(tableSource).contains(name)) {
+          throw new ValidationException(s"Processing time field '$name' has invalid type $t. " +
+            s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.")
+        }
+        if (getRowtimeAttributes(tableSource).contains(name)) {
+          throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " +
+            s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.")
+        }
+
+        val (physicalName, idx, tpe) = resolveInputField(name, tableSource)
+        // validate that mapped fields are are same type
+        if (tpe != t) {
+          throw ValidationException(s"Type $t of table field '$name' does not " +
+            s"match with type $tpe of the field '$physicalName' of the TableSource return type.")
+        }
+        idx
+    }
+
+    // ensure that only one field is mapped to an atomic type
+    if (inputType.isInstanceOf[AtomicType[_]] && mapping.count(_ >= 0) > 1) {
+      throw ValidationException(
+        s"More than one table field matched to atomic input type $inputType.")
+    }
+
+    mapping
+  }
+
+  /**
+    * Returns the Calcite schema of a [[TableSource]].
+    *
+    * @param tableSource The [[TableSource]] for which the Calcite schema is generated.
+    * @param selectedFields The indicies of all selected fields. None, if all fields are selected.
+    * @param streaming Flag to determine whether the schema of a stream or batch table is created.
+    * @param typeFactory The type factory to create the schema.
+    * @return The Calcite schema for the selected fields of the given [[TableSource]].
+    */
+  def getRelDataType(
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]],
+      streaming: Boolean,
+      typeFactory: FlinkTypeFactory): RelDataType = {
+
+    val fieldNames = tableSource.getTableSchema.getColumnNames
+    var fieldTypes = tableSource.getTableSchema.getTypes
+
+    if (streaming) {
+      // adjust the type of time attributes for streaming tables
+      val rowtimeAttributes = getRowtimeAttributes(tableSource)
+      val proctimeAttributes = getProctimeAttribute(tableSource)
+
+      // patch rowtime fields with time indicator type
+      rowtimeAttributes.foreach { rowtimeField =>
+        val idx = fieldNames.indexOf(rowtimeField)
+        fieldTypes = fieldTypes.patch(idx, Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1)
+      }
+      // patch proctime field with time indicator type
+      proctimeAttributes.foreach { proctimeField =>
+        val idx = fieldNames.indexOf(proctimeField)
+        fieldTypes = fieldTypes.patch(idx, Seq(TimeIndicatorTypeInfo.PROCTIME_INDICATOR), 1)
+      }
+    }
+
+    val (selectedFieldNames, selectedFieldTypes) = if (selectedFields.isDefined) {
+      // filter field names and types by selected fields
+      (selectedFields.get.map(fieldNames(_)), selectedFields.get.map(fieldTypes(_)))
+    } else {
+      (fieldNames, fieldTypes)
+    }
+    typeFactory.buildLogicalRowType(selectedFieldNames, selectedFieldTypes)
+  }
+
+  /**
+    * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]].
+    *
+    * @param tableSource The [[TableSource]] for which the [[RowtimeAttributeDescriptor]] is
+    *                    returned.
+    * @param selectedFields The fields which are selected from the [[TableSource]].
+    *                       If None, all fields are selected.
+    * @return The [[RowtimeAttributeDescriptor]] of the [[TableSource]].
+    */
+  def getRowtimeAttributeDescriptor(
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]]): Option[RowtimeAttributeDescriptor] = {
+
+    tableSource match {
+      case r: DefinedRowtimeAttributes =>
+        val descriptors = r.getRowtimeAttributeDescriptors
+        if (descriptors.size() == 0) {
+          None
+        } else if (descriptors.size > 1) {
+          throw ValidationException("Table with has more than a single rowtime attribute..")
+        } else {
+          // exactly one rowtime attribute descriptor
+          if (selectedFields.isEmpty) {
+            // all fields are selected.
+            Some(descriptors.get(0))
+          } else {
+            val descriptor = descriptors.get(0)
+            // look up index of row time attribute in schema
+            val fieldIdx = tableSource.getTableSchema.getColumnNames.indexOf(
+              descriptor.getAttributeName)
+            // is field among selected fields?
+            if (selectedFields.get.contains(fieldIdx)) {
+              Some(descriptor)
+            } else {
+              None
+            }
+          }
+        }
+      case _ => None
+    }
+  }
+
+  /**
+    * Obtains the [[RexNode]] expression to extract the rowtime timestamp for a [[TableSource]].
+    *
+    * @param tableSource The [[TableSource]] for which the expression is extracted.
+    * @param selectedFields The selected fields of the [[TableSource]].
+    *                       If None, all fields are selected.
+    * @param cluster The [[RelOptCluster]] of the current optimization process.
+    * @param relBuilder The [[RelBuilder]] to build the [[RexNode]].
+    * @param resultType The result type of the timestamp expression.
+    * @return The [[RexNode]] expression to extract the timestamp of the table source.
+    */
+  def getRowtimeExtractionExpression(
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]],
+      cluster: RelOptCluster,
+      relBuilder: RelBuilder,
+      resultType: TypeInformation[_]): Option[RexNode] = {
+
+    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+    /**
+      * Creates a RelNode with a schema that corresponds on the given fields
+      * Fields for which no information is available, will have default values.
+      */
+    def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): RelNode = {
+      val maxIdx = fields.map(_._2).max
+      val idxMap: Map[Int, (String, TypeInformation[_])] = Map(
+        fields.map(f => f._2 -> (f._1, f._3)): _*)
+      val (physicalFields, physicalTypes) = (0 to maxIdx)
+        .map(i => idxMap.getOrElse(i, ("", Types.BYTE))).unzip
+      val physicalSchema: RelDataType = typeFactory.buildLogicalRowType(
+        physicalFields,
+        physicalTypes)
+      LogicalValues.create(
+        cluster,
+        physicalSchema,
+        ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]])
+    }
+
+    val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, selectedFields)
+    rowtimeDesc.map { r =>
+      val tsExtractor = r.getTimestampExtractor
+
+      val fieldAccesses = if (tsExtractor.getArgumentFields.nonEmpty) {
+        val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, tableSource)
+        // push an empty values node with the physical schema on the relbuilder
+        relBuilder.push(createSchemaRelNode(resolvedFields))
+        // get extraction expression
+        resolvedFields.map(f => ResolvedFieldReference(f._1, f._3))
+      } else {
+        new Array[ResolvedFieldReference](0)
+      }
+
+      val expression = tsExtractor.getExpression(fieldAccesses)
+      // add cast to requested type and convert expression to RexNode
+      val rexExpression = Cast(expression, resultType).toRexNode(relBuilder)
+      relBuilder.clear()
+      rexExpression
+    }
+  }
+
+  /**
+    * Returns the indexes of the physical fields that required to compute the given logical fields.
+    *
+    * @param tableSource The [[TableSource]] for which the physical indexes are computed.
+    * @param logicalFieldIndexes The indexes of the accessed logical fields for which the physical
+    *                            indexes are computed.
+    * @return The indexes of the physical fields are accessed to forward and compute the logical
+    *         fields.
+    */
+  def getPhysicalIndexes(
+      tableSource: TableSource[_],
+      logicalFieldIndexes: Array[Int]): Array[Int] = {
+
+    // get the mapping from logical to physical positions.
+    // stream / batch distinction not important here
+    val fieldMapping = computeIndexMapping(tableSource, isStreamTable = true, None)
+
+    logicalFieldIndexes
+      // resolve logical indexes to physical indexes
+      .map(fieldMapping(_))
+      // resolve time indicator markers to physical indexes
+      .flatMap {
+      case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
+        // proctime field do not access a physical field
+        Seq()
+      case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
+        // rowtime field is computed.
+        // get names of fields which are accessed by the expression to compute the rowtime field.
+        val rowtimeAttributeDescriptor = getRowtimeAttributeDescriptor(tableSource, None)
+        val accessedFields = if (rowtimeAttributeDescriptor.isDefined) {
+          rowtimeAttributeDescriptor.get.getTimestampExtractor.getArgumentFields
+        } else {
+          throw TableException("Computed field mapping includes a rowtime marker but the " +
+            "TableSource does not provide a RowtimeAttributeDescriptor. " +
+            "This is a bug and should be reported.")
+        }
+        // resolve field names to physical fields
+        resolveInputFields(accessedFields, tableSource).map(_._2)
+      case idx =>
+        Seq(idx)
+    }
+  }
+
+  /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
+  private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = {
+    tableSource match {
+      case r: DefinedRowtimeAttributes =>
+        r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
+      case _ =>
+        Array()
+    }
+  }
+
+  /** Returns the proctime attribute of the [[TableSource]] if it is defined. */
+  private def getProctimeAttribute(tableSource: TableSource[_]): Option[String] = {
+    tableSource match {
+      case p: DefinedProctimeAttribute if p.getProctimeAttribute != null =>
+        Some(p.getProctimeAttribute)
+      case _ =>
+        None
+    }
+  }
+
+  /**
+    * Identifies for a field name of the logical schema, the corresponding physical field in the
+    * return type of a [[TableSource]].
+    *
+    * @param fieldName The logical field to look up.
+    * @param tableSource The table source in which to look for the field.
+    * @return The name, index, and type information of the physical field.
+    */
+  private def resolveInputField(
+      fieldName: String,
+      tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = {
+
+    val returnType = tableSource.getReturnType
+
+    /** Look up a field by name in a type information */
+    def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = {
+      returnType match {
+        case a: AtomicType[_] =>
+          // no composite type, we return the full atomic type as field
+          (fieldName, 0, a)
+        case c: CompositeType[_] =>
+          // get and check field index
+          val idx = c.getFieldIndex(fieldName)
+          if (idx < 0) {
+            throw ValidationException(failMsg)
+          }
+          // return field name, index, and field type
+          (fieldName, idx, c.getTypeAt(idx))
+        case _ => throw TableException("Unexpected type information.")
+      }
+    }
+
+    tableSource match {
+      case d: DefinedFieldMapping if d.getFieldMapping != null =>
+        // resolve field name in field mapping
+        val resolvedFieldName = d.getFieldMapping.get(fieldName)
+        if (resolvedFieldName == null) {
+          throw ValidationException(
+            s"Field '$fieldName' could not be resolved by the field mapping.")
+        }
+        // look up resolved field in return type
+        lookupField(
+          resolvedFieldName,
+          s"Table field '$fieldName' was resolved to TableSource return type field " +
+            s"'$resolvedFieldName', but field '$resolvedFieldName' was not found in the return " +
+            s"type $returnType of the TableSource. " +
+            s"Please verify the field mapping of the TableSource.")
+      case _ =>
+        // look up field in return type
+        lookupField(
+          fieldName,
+          s"Table field '$fieldName' was not found in the return type $returnType of the " +
+            s"TableSource.")
+    }
+  }
+
+  /**
+    * Identifies the physical fields in the return type [[TypeInformation]] of a [[TableSource]]
+    * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]].
+    *
+    * @param fieldNames The field names to look up.
+    * @param tableSource The table source in which to look for the field.
+    * @return The name, index, and type information of the physical field.
+    */
+  private def resolveInputFields(
+      fieldNames: Array[String],
+      tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = {
+    fieldNames.map(resolveInputField(_, tableSource))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
index babd815..bfc06f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -18,54 +18,61 @@
 
 package org.apache.flink.table.sources
 
+import java.util
+
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.Types
+
 /**
-  * Defines a logical event-time attribute for a [[TableSource]].
-  * The event-time attribute can be used for indicating, accessing, and working with Flink's
-  * event-time.
-  *
-  * A [[TableSource]] that implements this interface defines the name of
-  * the event-time attribute. The attribute must be present in the schema of the [[TableSource]]
-  * and must be of type [[Long]] or [[java.sql.Timestamp]].
+  * Extends a [[TableSource]] to specify a processing time attribute.
   */
-trait DefinedRowtimeAttribute {
+trait DefinedProctimeAttribute {
 
   /**
-    * Defines a name of the event-time attribute that represents Flink's event-time, i.e., an
-    * attribute that is aligned with the watermarks of the
-    * [[org.apache.flink.streaming.api.datastream.DataStream]] returned by
-    * [[StreamTableSource.getDataStream()]].
-    *
-    * An attribute with the given name must be present in the schema of the [[TableSource]].
-    * The attribute must be of type [[Long]] or [[java.sql.Timestamp]].
+    * Returns the name of a processing time attribute or null if no processing time attribute is
+    * present.
     *
-    * The method should return null if no rowtime attribute is defined.
-    *
-    * @return The name of the field that represents the event-time field and which is aligned
-    *         with the watermarks of the [[org.apache.flink.streaming.api.datastream.DataStream]]
-    *         returned by [[StreamTableSource.getDataStream()]].
-    *         The field must be present in the schema of the [[TableSource]] and be of type [[Long]]
-    *         or [[java.sql.Timestamp]].
+    * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
+    * type [[Types.SQL_TIMESTAMP]].
     */
-  def getRowtimeAttribute: String
+  def getProctimeAttribute: String
 }
 
 /**
-  * Defines a logical processing-time attribute for a [[TableSource]].
-  * The processing-time attribute can be used for indicating, accessing, and working with Flink's
-  * processing-time.
-  *
-  * A [[TableSource]] that implements this interface defines the name of
-  * the processing-time attribute. The attribute will be added to the schema of the
-  * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+  * Extends a [[TableSource]] to specify rowtime attributes via a
+  * [[RowtimeAttributeDescriptor]].
   */
-trait DefinedProctimeAttribute {
+trait DefinedRowtimeAttributes {
 
   /**
-    * Defines a name of the processing-time attribute that represents Flink's
-    * processing-time. Null if no rowtime should be available.
+    * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
     *
-    * The field will be appended to the schema provided by the [[TableSource]].
+    * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
+    * type [[Types.SQL_TIMESTAMP]].
+    *
+    * @return A list of [[RowtimeAttributeDescriptor]].
     */
-  def getProctimeAttribute: String
+  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
+}
+
+/**
+  * Describes a rowtime attribute of a [[TableSource]].
+  *
+  * @param attributeName The name of the rowtime attribute.
+  * @param timestampExtractor The timestamp extractor to derive the values of the attribute.
+  * @param watermarkStrategy The watermark strategy associated with the attribute.
+  */
+class RowtimeAttributeDescriptor(
+  attributeName: String,
+  timestampExtractor: TimestampExtractor,
+  watermarkStrategy: WatermarkStrategy) {
+
+  /** Returns the name of the rowtime attribute. */
+  def getAttributeName: String = attributeName
+
+  /** Returns the [[TimestampExtractor]] for the attribute. */
+  def getTimestampExtractor: TimestampExtractor = timestampExtractor
 
+  /** Returns the [[WatermarkStrategy]] for the attribute. */
+  def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
new file mode 100644
index 0000000..e0f01d5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
+
+/**
+  * Provides the an expression to extract the timestamp for a rowtime attribute.
+  */
+abstract class TimestampExtractor extends FieldComputer[Long] {
+
+  /** Timestamp extractors compute the timestamp as Long. */
+  override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]
+}
+
+/**
+  * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
+  *
+  * @param field The field to convert into a rowtime attribute.
+  */
+class ExistingField(field: String) extends TimestampExtractor {
+
+  override def getArgumentFields: Array[String] = Array(field)
+
+  @throws[ValidationException]
+  override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
+
+    // get type of field to convert
+    val fieldType = physicalFieldTypes(0)
+
+    // check that the field to convert is of type Long or Timestamp
+    fieldType match {
+      case Types.LONG => // OK
+      case Types.SQL_TIMESTAMP => // OK
+      case _: TypeInformation[_] =>
+        throw ValidationException(
+          s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
+    }
+  }
+
+  /**
+    * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
+    * rowtime attribute.
+    */
+  def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
+
+    val fieldAccess: Expression = fieldAccesses(0)
+
+    fieldAccess.resultType match {
+      case Types.LONG =>
+        // access LONG field
+        fieldAccess
+      case Types.SQL_TIMESTAMP =>
+        // cast timestamp to long
+        Cast(fieldAccess, Types.LONG)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
new file mode 100644
index 0000000..eec423f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.types.Row
+
+/**
+  * Provides a strategy to generate watermarks for a rowtime attribute.
+  *
+  * A watermark strategy is either a [[PeriodicWatermarkAssigner]] or
+  * [[PunctuatedWatermarkAssigner]].
+  *
+  */
+sealed abstract class WatermarkStrategy extends Serializable
+
+/** A periodic watermark assigner. */
+abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
+
+  /**
+    * Updates the assigner with the next timestamp.
+    *
+    * @param timestamp The next timestamp to update the assigner.
+    */
+  def nextTimestamp(timestamp: Long): Unit
+
+  /**
+    * Returns the current watermark.
+    *
+    * @return The current watermark.
+    */
+  def getWatermark: Watermark
+}
+
+/** A punctuated watermark assigner. */
+abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
+
+  /**
+    * Returns the watermark for the current row or null if no watermark should be generated.
+    *
+    * @param row The current row.
+    * @param timestamp The value of the timestamp attribute for the row.
+    * @return The watermark for this row or null if no watermark should be generated.
+    */
+  def getWatermark(row: Row, timestamp: Long): Watermark
+}
+
+/**
+  * A watermark assigner for ascending rowtime attributes.
+  *
+  * Emits a watermark of the maximum observed timestamp so far minus 1.
+  * Rows that have a timestamp equal to the max timestamp are not late.
+  */
+class AscendingWatermarks extends PeriodicWatermarkAssigner {
+
+  var maxTimestamp: Long = Long.MinValue + 1
+
+  override def nextTimestamp(timestamp: Long): Unit = {
+    if (timestamp > maxTimestamp) {
+      maxTimestamp = timestamp
+    }
+  }
+
+  override def getWatermark: Watermark = new Watermark(maxTimestamp - 1)
+}
+
+/**
+  * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
+  *
+  * Emits watermarks which are the maximum observed timestamp minus the specified delay.
+  *
+  * @param delay The delay by which watermarks are behind the maximum observed timestamp.
+  */
+class BoundedOutOfOrderWatermarks(val delay: Long) extends PeriodicWatermarkAssigner {
+
+  var maxTimestamp: Long = Long.MinValue + delay
+
+  override def nextTimestamp(timestamp: Long): Unit = {
+    if (timestamp > maxTimestamp) {
+      maxTimestamp = timestamp
+    }
+  }
+
+  override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
index 824f3fb..ad82d52 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -47,8 +47,11 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean)
 
 object TimeIndicatorTypeInfo {
 
-  val ROWTIME_MARKER: Int = -1
-  val PROCTIME_MARKER: Int = -2
+  val ROWTIME_STREAM_MARKER: Int = -1
+  val PROCTIME_STREAM_MARKER: Int = -2
+
+  val ROWTIME_BATCH_MARKER: Int = -3
+  val PROCTIME_BATCH_MARKER: Int = -4
 
   val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true)
   val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false)

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
index a2356a6..6df00e7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -181,13 +181,17 @@ class ExternalCatalogTest extends TableTestBase {
     util.verifyTable(result, expected)
   }
 
-  def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+  def sourceBatchTableNode(
+      sourceTablePath: Array[String],
+      fields: Array[String]): String = {
     s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
-        s"fields=[${fields.mkString(", ")}])"
+        s"fields=[${fields.mkString(", ")}], " +
+        s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])"
   }
 
   def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
     s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
-        s"fields=[${fields.mkString(", ")}])"
+      s"fields=[${fields.mkString(", ")}], " +
+      s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])"
   }
 }


Mime
View raw message