flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject flink git commit: [FLINK-3639] add methods for registering datasets and tables in the TableEnvironment
Date Thu, 24 Mar 2016 16:43:09 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5108f6875 -> 2da562b42


[FLINK-3639] add methods for registering datasets and tables in the TableEnvironment

This closes #1827


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2da562b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2da562b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2da562b4

Branch: refs/heads/master
Commit: 2da562b429cb6479abd971ff2ccc8a990b43bf59
Parents: 5108f68
Author: vasia <vasia@apache.org>
Authored: Mon Mar 21 15:22:58 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Thu Mar 24 15:33:34 2016 +0100

----------------------------------------------------------------------
 .../api/java/table/JavaBatchTranslator.scala    |   2 +-
 .../flink/api/java/table/TableEnvironment.scala |  38 +++--
 .../api/scala/table/TableEnvironment.scala      |  34 +++--
 .../api/table/AbstractTableEnvironment.scala    |  86 +++++++++++
 .../flink/api/table/plan/PlanTranslator.scala   |  69 +--------
 .../api/table/plan/TranslationContext.scala     | 132 ++++++++++++++---
 .../api/table/plan/rules/FlinkRuleSets.scala    |   3 +
 .../plan/rules/dataSet/DataSetScanRule.scala    |  18 ++-
 .../api/table/plan/schema/TableTable.scala      |  46 ++++++
 .../org/apache/flink/api/table/table.scala      |   2 +
 .../java/table/test/RegisterDataSetITCase.java  | 142 +++++++++++++++++++
 .../table/test/RegisterDataSetITCase.scala      | 136 ++++++++++++++++++
 .../table/test/utils/ExpressionEvaluator.scala  |   2 +-
 .../src/test/scala/resources/testFilter0.out    |   2 +-
 .../src/test/scala/resources/testFilter1.out    |   2 +-
 .../src/test/scala/resources/testJoin0.out      |   4 +-
 .../src/test/scala/resources/testJoin1.out      |   4 +-
 .../src/test/scala/resources/testUnion0.out     |   4 +-
 .../src/test/scala/resources/testUnion1.out     |   4 +-
 19 files changed, 616 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 1f4e803..028711b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -51,7 +51,7 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator {
       fieldNames
     )
 
-    val tabName = TranslationContext.addDataSet(dataSetTable)
+    val tabName = TranslationContext.registerDataSetTable(dataSetTable)
     val relBuilder = TranslationContext.getRelBuilder
 
     // create table scan operator

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
index 938c778..e0d88a3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
@@ -20,7 +20,8 @@ package org.apache.flink.api.java.table
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.expressions.ExpressionParser
+import org.apache.flink.api.table.{AbstractTableEnvironment, Table}
 
 /**
  * Environment for working with the Table API.
@@ -28,14 +29,7 @@ import org.apache.flink.api.table.{TableConfig, Table}
  * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You
  * can also use the provided methods to create a [[Table]] directly from a data source.
  */
-class TableEnvironment {
-
-  private val config = new TableConfig()
-
-  /**
-   * Returns the table config to define the runtime behavior of the Table API.
-   */
-  def getConfig = config
+class TableEnvironment extends AbstractTableEnvironment {
 
   /**
    * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
@@ -87,5 +81,29 @@ class TableEnvironment {
     new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+    registerDataSetInternal(name, dataset)
+  }
 
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the Table field names
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+    registerDataSetInternal(name, dataset, exprs)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
index 705378a..9f71c63 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.{TableConfig, Table}
+import org.apache.flink.api.table.{AbstractTableEnvironment, Table}
 
 /**
  * Environment for working with the Table API.
@@ -28,14 +28,7 @@ import org.apache.flink.api.table.{TableConfig, Table}
  * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You
  * can also use the provided methods to create a [[Table]] directly from a data source.
  */
-class TableEnvironment {
-
-  private val config = new TableConfig()
-
-  /**
-   * Returns the table config to define the runtime behavior of the Table API.
-   */
-  def getConfig = config
+class TableEnvironment extends AbstractTableEnvironment {
 
   /**
    * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this:
@@ -72,5 +65,26 @@ class TableEnvironment {
      new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+    registerDataSetInternal(name, dataset.javaSet)
+  }
 
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the field names expression
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: Expression*): Unit =
{
+    registerDataSetInternal(name, dataset.javaSet, fields.toArray)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
new file mode 100644
index 0000000..4dedc47
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable}
+
+class AbstractTableEnvironment {
+
+  private[flink] val config = new TableConfig()
+
+  /**
+   * Returns the table config to define the runtime behavior of the Table API.
+   */
+  def getConfig = config
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+    val tableTable = new TableTable(table.getRelNode())
+    TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+    if (TranslationContext.isRegistered(tableName)) {
+      val relBuilder = TranslationContext.getRelBuilder
+      relBuilder.scan(tableName)
+      new Table(relBuilder.build(), relBuilder)
+    }
+    else {
+      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+    }
+  }
+
+  private[flink] def registerDataSetInternal[T](name: String, dataset: DataSet[T]): Unit
= {
+
+    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType)
+    val dataSetTable = new DataSetTable[T](
+      dataset,
+      fieldIndexes,
+      fieldNames
+    )
+    TranslationContext.registerTable(dataSetTable, name)
+  }
+
+  private[flink] def registerDataSetInternal[T](
+      name: String, dataset: DataSet[T], fields: Array[Expression]): Unit = {
+
+    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](
+      dataset.getType, fields.toArray)
+
+    val dataSetTable = new DataSetTable[T](
+      dataset,
+      fieldIndexes.toArray,
+      fieldNames.toArray
+    )
+    TranslationContext.registerTable(dataSetTable, name)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index f443155..410c570 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -18,11 +18,8 @@
 package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, Expression, UnresolvedFieldReference}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.{ExpressionParser, Expression}
 import org.apache.flink.api.table.Table
 
 import scala.language.reflectiveCalls
@@ -53,15 +50,7 @@ abstract class PlanTranslator {
    */
   def createTable[A](repr: Representation[A]): Table = {
 
-    val fieldNames: Array[String] = repr.getType() match {
-      case t: TupleTypeInfo[A] => t.getFieldNames
-      case c: CaseClassTypeInfo[A] => c.getFieldNames
-      case p: PojoTypeInfo[A] => p.getFieldNames
-      case tpe =>
-        throw new IllegalArgumentException(
-          s"Type $tpe requires explicit field naming with AS.")
-    }
-    val fieldIndexes = fieldNames.indices.toArray
+    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType())
     createTable(repr, fieldIndexes, fieldNames)
   }
 
@@ -86,57 +75,7 @@ abstract class PlanTranslator {
 
     val inputType = repr.getType()
 
-    val indexedNames: Array[(Int, String)] = inputType match {
-      case a: AtomicType[A] =>
-        if (exprs.length != 1) {
-          throw new IllegalArgumentException("Atomic type may can only have a single field.")
-        }
-        exprs.map {
-          case UnresolvedFieldReference(name) => (0, name)
-          case _ => throw new IllegalArgumentException(
-            "Field reference expression expected.")
-        }
-      case t: TupleTypeInfo[A] =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Naming(UnresolvedFieldReference(origName), name), _) =>
-            val idx = t.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new IllegalArgumentException(s"$origName is not a field of type $t")
-            }
-            (idx, name)
-          case _ => throw new IllegalArgumentException(
-            "Field reference expression or naming expression expected.")
-        }
-      case c: CaseClassTypeInfo[A] =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Naming(UnresolvedFieldReference(origName), name), _) =>
-            val idx = c.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new IllegalArgumentException(s"$origName is not a field of type $c")
-            }
-            (idx, name)
-          case _ => throw new IllegalArgumentException(
-            "Field reference expression or naming expression expected.")
-        }
-      case p: PojoTypeInfo[A] =>
-        exprs.map {
-          case Naming(UnresolvedFieldReference(origName), name) =>
-            val idx = p.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new IllegalArgumentException(s"$origName is not a field of type $p")
-            }
-            (idx, name)
-          case _ => throw new IllegalArgumentException(
-            "Field naming expression expected.")
-        }
-      case tpe => throw new IllegalArgumentException(
-        s"Type $tpe cannot be converted into Table.")
-    }
-
-    val (fieldIndexes, fieldNames) = indexedNames.unzip
-
+    val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType(), exprs)
     createTable(repr, fieldIndexes.toArray, fieldNames.toArray)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index 9acc7ba..330fe6b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -26,15 +26,20 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableException
+import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.api.table.plan.schema.{TableTable, DataSetTable}
 
 object TranslationContext {
 
   private var frameworkConfig: FrameworkConfig = null
   private var relBuilder: RelBuilder = null
   private var tables: SchemaPlus = null
-  private var tabNames: Map[AbstractTable, String] = null
+  private var tablesRegistry: Map[String, AbstractTable] = null
   private val nameCntr: AtomicInteger = new AtomicInteger(0)
 
   reset()
@@ -59,29 +64,55 @@ object TranslationContext {
       .traitDefs(ConventionTraitDef.INSTANCE)
       .build
 
-    tabNames = Map[AbstractTable, String]()
-
+    tablesRegistry = Map[String, AbstractTable]()
     relBuilder = RelBuilder.create(frameworkConfig)
-
     nameCntr.set(0)
 
   }
 
-  def addDataSet(newTable: DataSetTable[_]): String = {
+  /**
+   * Adds a table to the Calcite schema so it can be used by the Table API
+   */
+  def registerDataSetTable(newTable: DataSetTable[_]): String = {
+    val tabName = "_DataSetTable_" + nameCntr.getAndIncrement()
+    tables.add(tabName, newTable)
+    tabName
+  }
 
-    // look up name
-    val tabName = tabNames.get(newTable)
+  /**
+   * Adds a table to the Calcite schema and the tables registry,
+   * so it can be used by both Table API and SQL statements.
+   */
+  @throws[TableException]
+  def registerTable(table: AbstractTable, name: String): Unit = {
+    val illegalPattern = "^_DataSetTable_[0-9]+$".r
+    val m = illegalPattern.findFirstIn(name)
+    m match {
+      case Some(_) =>
+        throw new TableException(s"Illegal Table name. " +
+          s"Please choose a name that does not contain the pattern $illegalPattern")
+      case None => {
+        val existingTable = tablesRegistry.get(name)
+        existingTable match {
+          case Some(_) =>
+            throw new TableException(s"Table \'$name\' already exists. " +
+              s"Please, choose a different name.")
+          case None =>
+            tablesRegistry += (name -> table)
+            tables.add(name, table)
+        }
+      }
+    }
+  }
 
-    tabName match {
-      case Some(name) =>
-        name
+  def isRegistered(name: String): Boolean = {
+    val table = tablesRegistry.get(name)
+    table match {
+      case Some(_) =>
+        true
       case None =>
-        val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
-        tabNames += (newTable -> tabName)
-        tables.add(tabName, newTable)
-        tabName
+        false
     }
-
   }
 
   def getUniqueName: String = {
@@ -96,6 +127,75 @@ object TranslationContext {
     frameworkConfig
   }
 
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
+    val fieldNames: Array[String] = inputType match {
+      case t: TupleTypeInfo[A] => t.getFieldNames
+      case c: CaseClassTypeInfo[A] => c.getFieldNames
+      case p: PojoTypeInfo[A] => p.getFieldNames
+      case tpe =>
+        throw new IllegalArgumentException(
+          s"Type $tpe requires explicit field naming with AS.")
+    }
+    val fieldIndexes = fieldNames.indices.toArray
+    (fieldNames, fieldIndexes)
+  }
+
+  def getFieldInfo[A](
+    inputType: TypeInformation[A],
+    exprs: Array[Expression]): (Array[String], Array[Int]) = {
+
+    val indexedNames: Array[(Int, String)] = inputType match {
+      case a: AtomicType[A] =>
+        if (exprs.length != 1) {
+          throw new IllegalArgumentException("Atomic type may can only have a single field.")
+        }
+        exprs.map {
+          case UnresolvedFieldReference(name) => (0, name)
+          case _ => throw new IllegalArgumentException(
+            "Field reference expression expected.")
+        }
+      case t: TupleTypeInfo[A] =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Naming(UnresolvedFieldReference(origName), name), _) =>
+            val idx = t.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new IllegalArgumentException(s"$origName is not a field of type $t")
+            }
+            (idx, name)
+          case _ => throw new IllegalArgumentException(
+            "Field reference expression or naming expression expected.")
+        }
+      case c: CaseClassTypeInfo[A] =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Naming(UnresolvedFieldReference(origName), name), _) =>
+            val idx = c.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new IllegalArgumentException(s"$origName is not a field of type $c")
+            }
+            (idx, name)
+          case _ => throw new IllegalArgumentException(
+            "Field reference expression or naming expression expected.")
+        }
+      case p: PojoTypeInfo[A] =>
+        exprs.map {
+          case Naming(UnresolvedFieldReference(origName), name) =>
+            val idx = p.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new IllegalArgumentException(s"$origName is not a field of type $p")
+            }
+            (idx, name)
+          case _ => throw new IllegalArgumentException(
+            "Field naming expression expected.")
+        }
+      case tpe => throw new IllegalArgumentException(
+        s"Type $tpe cannot be converted into Table.")
+    }
+
+    val (fieldIndexes, fieldNames) = indexedNames.unzip
+    (fieldNames.toArray, fieldIndexes.toArray)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 94da6f8..b0815ef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -29,6 +29,9 @@ object FlinkRuleSets {
     */
   val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
 
+    // convert a logical table scan to a relational expression
+    TableScanRule.INSTANCE,
+
     // push a filter into a join
     FilterJoinRule.FILTER_ON_JOIN,
     // push filter into the children of a join

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
index 2865d9f..3cdaca3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource}
+import org.apache.flink.api.table.plan.schema.DataSetTable
 
 class DataSetScanRule
   extends ConverterRule(
@@ -32,6 +33,21 @@ class DataSetScanRule
       DataSetConvention.INSTANCE,
       "FlinkScanRule")
   {
+
+  /**
+   * If the input is not a DataSetTable, we want the TableScanRule to match instead
+   */
+    override def matches(call: RelOptRuleCall): Boolean = {
+      val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
+      val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]])
+      dataSetTable match {
+        case _: DataSetTable[Any] =>
+          true
+        case _ =>
+          false
+      }
+    }
+
     def convert(rel: RelNode): RelNode = {
       val scan: TableScan = rel.asInstanceOf[TableScan]
       val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
new file mode 100644
index 0000000..d9a8cce
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.TranslatableTable
+
+/**
+ * A [[org.apache.calcite.schema.Table]] implementation for registering
+ * Table API Tables in the Calcite schema to be used by Flink SQL.
+ * It implements [[TranslatableTable]] so that its logical scan
+ * can be converted to a relational expression.
+ *
+ * @see [[DataSetTable]]
+ */
+class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable {
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = {
+    relNode
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 0e480e8..53c3b4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -73,6 +73,8 @@ class Table(
   extends BaseTable(relNode, relBuilder)
 {
 
+  def getRelNode(): RelNode = relNode
+
   /**
     * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
     * can contain complex expressions and aggregations.

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java
new file mode 100644
index 0000000..959fb90
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.plan.TranslationContext;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class RegisterDataSetITCase extends TableProgramsTestBase {
+
+	public RegisterDataSetITCase(TestExecutionMode mode, TableConfigMode configMode) {
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testSimpleRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+		TranslationContext.reset();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds);
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("f0, f1");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testRegisterWithFields() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+		TranslationContext.reset();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds, "a, b, c");
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+				"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+				"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+				"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+				"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterExistingDatasetTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+		TranslationContext.reset();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("MyTable", ds);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
+				CollectionDataSets.getSmall5TupleDataSet(env);
+		tableEnv.registerDataSet("MyTable", ds2);
+	}
+
+	@Test(expected = TableException.class)
+	public void testScanUnregisteredTable() throws Exception {
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+		TranslationContext.reset();
+
+		tableEnv.scan("nonRegisteredTable");
+	}
+
+	@Test
+	public void testTableRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+		TranslationContext.reset();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.registerTable(tableName, t);
+		Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
+				"13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testIllegalName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = getJavaTableEnvironment();
+		TranslationContext.reset();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.registerTable("_DataSetTable_42", t);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
new file mode 100644
index 0000000..535c064
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class RegisterDataSetITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSimpleRegister(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds)
+    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n"
+
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRegisterWithFields(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+    val t = tEnv.scan(tableName).select('a, 'b)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n"
+
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterExistingDataSet(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds1)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds2)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testScanUnregisteredTable(): Unit = {
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    tEnv.scan("someTable")
+  }
+
+  @Test
+  def testTableRegister(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    tEnv.registerTable(tableName, t)
+
+    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
+
+    val expected = "9,4\n" + "10,4\n" +
+      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
+      "19,6\n" + "20,6\n" + "21,6\n"
+
+    val results = regT.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterExistingTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable
+    tEnv.registerTable("MyTable", t1)
+    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable
+    tEnv.registerDataSet("MyTable", t2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
index d05ac0d..a52bbbd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -49,7 +49,7 @@ object ExpressionEvaluator {
     // create DataSetTable
     val dataSetMock = mock(classOf[DataSet[Any]])
     when(dataSetMock.getType).thenReturn(typeInfo)
-    val tableName = TranslationContext.addDataSet(new DataSetTable[Any](
+    val tableName = TranslationContext.registerDataSetTable(new DataSetTable[Any](
       dataSetMock,
       (0 until typeInfo.getArity).toArray,
       (0 until typeInfo.getArity).map("f" + _).toArray))

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
index 1d0198d..b3786d9 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out
@@ -1,6 +1,6 @@
 == Abstract Syntax Tree ==
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
-  LogicalTableScan(table=[[DataSetTable_0]])
+  LogicalTableScan(table=[[_DataSetTable_0]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
index ea76faa..1049466 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out
@@ -1,6 +1,6 @@
 == Abstract Syntax Tree ==
 LogicalFilter(condition=[=(MOD($0, 2), 0)])
-  LogicalTableScan(table=[[DataSetTable_0]])
+  LogicalTableScan(table=[[_DataSetTable_0]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
index 85b815d..5a60862 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out
@@ -2,8 +2,8 @@
 LogicalProject(a=[$0], c=[$2])
   LogicalFilter(condition=[=($1, $3)])
     LogicalJoin(condition=[true], joinType=[inner])
-      LogicalTableScan(table=[[DataSetTable_0]])
-      LogicalTableScan(table=[[DataSetTable_1]])
+      LogicalTableScan(table=[[_DataSetTable_0]])
+      LogicalTableScan(table=[[_DataSetTable_1]])
 
 == Physical Execution Plan ==
 Stage 4 : Data Source

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
index e88da82..1ca23c7 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out
@@ -2,8 +2,8 @@
 LogicalProject(a=[$0], c=[$2])
   LogicalFilter(condition=[=($1, $3)])
     LogicalJoin(condition=[true], joinType=[inner])
-      LogicalTableScan(table=[[DataSetTable_0]])
-      LogicalTableScan(table=[[DataSetTable_1]])
+      LogicalTableScan(table=[[_DataSetTable_0]])
+      LogicalTableScan(table=[[_DataSetTable_1]])
 
 == Physical Execution Plan ==
 Stage 4 : Data Source

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index 8e892c6..d17517f 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -1,7 +1,7 @@
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
-  LogicalTableScan(table=[[DataSetTable_0]])
-  LogicalTableScan(table=[[DataSetTable_1]])
+  LogicalTableScan(table=[[_DataSetTable_0]])
+  LogicalTableScan(table=[[_DataSetTable_1]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source

http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index 34892b1..875f77b 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -1,7 +1,7 @@
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
-  LogicalTableScan(table=[[DataSetTable_0]])
-  LogicalTableScan(table=[[DataSetTable_1]])
+  LogicalTableScan(table=[[_DataSetTable_0]])
+  LogicalTableScan(table=[[_DataSetTable_1]])
 
 == Physical Execution Plan ==
 Stage 3 : Data Source


Mime
View raw message