Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1BA1219909 for ; Thu, 24 Mar 2016 16:43:10 +0000 (UTC) Received: (qmail 5777 invoked by uid 500); 24 Mar 2016 16:43:10 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 5739 invoked by uid 500); 24 Mar 2016 16:43:10 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 5730 invoked by uid 99); 24 Mar 2016 16:43:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 16:43:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D8A11DFBAF; Thu, 24 Mar 2016 16:43:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3639] add methods for registering datasets and tables in the TableEnvironment Date: Thu, 24 Mar 2016 16:43:09 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 5108f6875 -> 2da562b42 [FLINK-3639] add methods for registering datasets and tables in the TableEnvironment This closes #1827 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2da562b4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2da562b4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2da562b4 Branch: refs/heads/master Commit: 2da562b429cb6479abd971ff2ccc8a990b43bf59 Parents: 5108f68 Author: vasia Authored: Mon Mar 21 15:22:58 2016 +0100 Committer: vasia Committed: Thu Mar 24 15:33:34 2016 +0100 ---------------------------------------------------------------------- .../api/java/table/JavaBatchTranslator.scala | 2 +- .../flink/api/java/table/TableEnvironment.scala | 38 +++-- .../api/scala/table/TableEnvironment.scala | 34 +++-- .../api/table/AbstractTableEnvironment.scala | 86 +++++++++++ .../flink/api/table/plan/PlanTranslator.scala | 69 +-------- .../api/table/plan/TranslationContext.scala | 132 ++++++++++++++--- .../api/table/plan/rules/FlinkRuleSets.scala | 3 + .../plan/rules/dataSet/DataSetScanRule.scala | 18 ++- .../api/table/plan/schema/TableTable.scala | 46 ++++++ .../org/apache/flink/api/table/table.scala | 2 + .../java/table/test/RegisterDataSetITCase.java | 142 +++++++++++++++++++ .../table/test/RegisterDataSetITCase.scala | 136 ++++++++++++++++++ .../table/test/utils/ExpressionEvaluator.scala | 2 +- .../src/test/scala/resources/testFilter0.out | 2 +- .../src/test/scala/resources/testFilter1.out | 2 +- .../src/test/scala/resources/testJoin0.out | 4 +- .../src/test/scala/resources/testJoin1.out | 4 +- .../src/test/scala/resources/testUnion0.out | 4 +- .../src/test/scala/resources/testUnion1.out | 4 +- 19 files changed, 616 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 1f4e803..028711b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -51,7 +51,7 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { fieldNames ) - val tabName = TranslationContext.addDataSet(dataSetTable) + val tabName = TranslationContext.registerDataSetTable(dataSetTable) val relBuilder = TranslationContext.getRelBuilder // create table scan operator http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala index 938c778..e0d88a3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala @@ -20,7 +20,8 @@ package org.apache.flink.api.java.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.expressions.ExpressionParser +import org.apache.flink.api.table.{AbstractTableEnvironment, Table} /** * Environment for working with the Table API. @@ -28,14 +29,7 @@ import org.apache.flink.api.table.{TableConfig, Table} * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You * can also use the provided methods to create a [[Table]] directly from a data source. */ -class TableEnvironment { - - private val config = new TableConfig() - - /** - * Returns the table config to define the runtime behavior of the Table API. - */ - def getConfig = config +class TableEnvironment extends AbstractTableEnvironment { /** * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. @@ -87,5 +81,29 @@ class TableEnvironment { new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + registerDataSetInternal(name, dataset) + } + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the Table field names + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = { + val exprs = ExpressionParser + .parseExpressionList(fields) + .toArray + registerDataSetInternal(name, dataset, exprs) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala index 705378a..9f71c63 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.DataSet import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.{AbstractTableEnvironment, Table} /** * Environment for working with the Table API. @@ -28,14 +28,7 @@ import org.apache.flink.api.table.{TableConfig, Table} * This can be used to convert a [[DataSet]] to a [[Table]] and back again. You * can also use the provided methods to create a [[Table]] directly from a data source. */ -class TableEnvironment { - - private val config = new TableConfig() - - /** - * Returns the table config to define the runtime behavior of the Table API. - */ - def getConfig = config +class TableEnvironment extends AbstractTableEnvironment { /** * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this: @@ -72,5 +65,26 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + registerDataSetInternal(name, dataset.javaSet) + } + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the field names expression + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: Expression*): Unit = { + registerDataSetInternal(name, dataset.javaSet, fields.toArray) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala new file mode 100644 index 0000000..4dedc47 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable} + +class AbstractTableEnvironment { + + private[flink] val config = new TableConfig() + + /** + * Returns the table config to define the runtime behavior of the Table API. + */ + def getConfig = config + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { + val tableTable = new TableTable(table.getRelNode()) + TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { + if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) + } + else { + throw new TableException(s"Table \'$tableName\' was not found in the registry.") + } + } + + private[flink] def registerDataSetInternal[T](name: String, dataset: DataSet[T]): Unit = { + + val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) + val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes, + fieldNames + ) + TranslationContext.registerTable(dataSetTable, name) + } + + private[flink] def registerDataSetInternal[T]( + name: String, dataset: DataSet[T], fields: Array[Expression]): Unit = { + + val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T]( + dataset.getType, fields.toArray) + + val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes.toArray, + fieldNames.toArray + ) + TranslationContext.registerTable(dataSetTable, name) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala index f443155..410c570 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala @@ -18,11 +18,8 @@ package org.apache.flink.api.table.plan import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, Expression, UnresolvedFieldReference} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.expressions.{ExpressionParser, Expression} import org.apache.flink.api.table.Table import scala.language.reflectiveCalls @@ -53,15 +50,7 @@ abstract class PlanTranslator { */ def createTable[A](repr: Representation[A]): Table = { - val fieldNames: Array[String] = repr.getType() match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case tpe => - throw new IllegalArgumentException( - s"Type $tpe requires explicit field naming with AS.") - } - val fieldIndexes = fieldNames.indices.toArray + val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType()) createTable(repr, fieldIndexes, fieldNames) } @@ -86,57 +75,7 @@ abstract class PlanTranslator { val inputType = repr.getType() - val indexedNames: Array[(Int, String)] = inputType match { - case a: AtomicType[A] => - if (exprs.length != 1) { - throw new IllegalArgumentException("Atomic type may can only have a single field.") - } - exprs.map { - case UnresolvedFieldReference(name) => (0, name) - case _ => throw new IllegalArgumentException( - "Field reference expression expected.") - } - case t: TupleTypeInfo[A] => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Naming(UnresolvedFieldReference(origName), name), _) => - val idx = t.getFieldIndex(origName) - if (idx < 0) { - throw new IllegalArgumentException(s"$origName is not a field of type $t") - } - (idx, name) - case _ => throw new IllegalArgumentException( - "Field reference expression or naming expression expected.") - } - case c: CaseClassTypeInfo[A] => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Naming(UnresolvedFieldReference(origName), name), _) => - val idx = c.getFieldIndex(origName) - if (idx < 0) { - throw new IllegalArgumentException(s"$origName is not a field of type $c") - } - (idx, name) - case _ => throw new IllegalArgumentException( - "Field reference expression or naming expression expected.") - } - case p: PojoTypeInfo[A] => - exprs.map { - case Naming(UnresolvedFieldReference(origName), name) => - val idx = p.getFieldIndex(origName) - if (idx < 0) { - throw new IllegalArgumentException(s"$origName is not a field of type $p") - } - (idx, name) - case _ => throw new IllegalArgumentException( - "Field naming expression expected.") - } - case tpe => throw new IllegalArgumentException( - s"Type $tpe cannot be converted into Table.") - } - - val (fieldIndexes, fieldNames) = indexedNames.unzip - + val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType(), exprs) createTable(repr, fieldIndexes.toArray, fieldNames.toArray) } http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index 9acc7ba..330fe6b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -26,15 +26,20 @@ import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableException +import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression} import org.apache.flink.api.table.plan.cost.DataSetCostFactory -import org.apache.flink.api.table.plan.schema.DataSetTable +import org.apache.flink.api.table.plan.schema.{TableTable, DataSetTable} object TranslationContext { private var frameworkConfig: FrameworkConfig = null private var relBuilder: RelBuilder = null private var tables: SchemaPlus = null - private var tabNames: Map[AbstractTable, String] = null + private var tablesRegistry: Map[String, AbstractTable] = null private val nameCntr: AtomicInteger = new AtomicInteger(0) reset() @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build - tabNames = Map[AbstractTable, String]() - + tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } - def addDataSet(newTable: DataSetTable[_]): String = { + /** + * Adds a table to the Calcite schema so it can be used by the Table API + */ + def registerDataSetTable(newTable: DataSetTable[_]): String = { + val tabName = "_DataSetTable_" + nameCntr.getAndIncrement() + tables.add(tabName, newTable) + tabName + } - // look up name - val tabName = tabNames.get(newTable) + /** + * Adds a table to the Calcite schema and the tables registry, + * so it can be used by both Table API and SQL statements. + */ + @throws[TableException] + def registerTable(table: AbstractTable, name: String): Unit = { + val illegalPattern = "^_DataSetTable_[0-9]+$".r + val m = illegalPattern.findFirstIn(name) + m match { + case Some(_) => + throw new TableException(s"Illegal Table name. " + + s"Please choose a name that does not contain the pattern $illegalPattern") + case None => { + val existingTable = tablesRegistry.get(name) + existingTable match { + case Some(_) => + throw new TableException(s"Table \'$name\' already exists. " + + s"Please, choose a different name.") + case None => + tablesRegistry += (name -> table) + tables.add(name, table) + } + } + } + } - tabName match { - case Some(name) => - name + def isRegistered(name: String): Boolean = { + val table = tablesRegistry.get(name) + table match { + case Some(_) => + true case None => - val tabName = "DataSetTable_" + nameCntr.getAndIncrement() - tabNames += (newTable -> tabName) - tables.add(tabName, newTable) - tabName + false } - } def getUniqueName: String = { @@ -96,6 +127,75 @@ object TranslationContext { frameworkConfig } + def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + val fieldNames: Array[String] = inputType match { + case t: TupleTypeInfo[A] => t.getFieldNames + case c: CaseClassTypeInfo[A] => c.getFieldNames + case p: PojoTypeInfo[A] => p.getFieldNames + case tpe => + throw new IllegalArgumentException( + s"Type $tpe requires explicit field naming with AS.") + } + val fieldIndexes = fieldNames.indices.toArray + (fieldNames, fieldIndexes) + } + + def getFieldInfo[A]( + inputType: TypeInformation[A], + exprs: Array[Expression]): (Array[String], Array[Int]) = { + + val indexedNames: Array[(Int, String)] = inputType match { + case a: AtomicType[A] => + if (exprs.length != 1) { + throw new IllegalArgumentException("Atomic type may can only have a single field.") + } + exprs.map { + case UnresolvedFieldReference(name) => (0, name) + case _ => throw new IllegalArgumentException( + "Field reference expression expected.") + } + case t: TupleTypeInfo[A] => + exprs.zipWithIndex.map { + case (UnresolvedFieldReference(name), idx) => (idx, name) + case (Naming(UnresolvedFieldReference(origName), name), _) => + val idx = t.getFieldIndex(origName) + if (idx < 0) { + throw new IllegalArgumentException(s"$origName is not a field of type $t") + } + (idx, name) + case _ => throw new IllegalArgumentException( + "Field reference expression or naming expression expected.") + } + case c: CaseClassTypeInfo[A] => + exprs.zipWithIndex.map { + case (UnresolvedFieldReference(name), idx) => (idx, name) + case (Naming(UnresolvedFieldReference(origName), name), _) => + val idx = c.getFieldIndex(origName) + if (idx < 0) { + throw new IllegalArgumentException(s"$origName is not a field of type $c") + } + (idx, name) + case _ => throw new IllegalArgumentException( + "Field reference expression or naming expression expected.") + } + case p: PojoTypeInfo[A] => + exprs.map { + case Naming(UnresolvedFieldReference(origName), name) => + val idx = p.getFieldIndex(origName) + if (idx < 0) { + throw new IllegalArgumentException(s"$origName is not a field of type $p") + } + (idx, name) + case _ => throw new IllegalArgumentException( + "Field naming expression expected.") + } + case tpe => throw new IllegalArgumentException( + s"Type $tpe cannot be converted into Table.") + } + + val (fieldIndexes, fieldNames) = indexedNames.unzip + (fieldNames.toArray, fieldIndexes.toArray) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 94da6f8..b0815ef 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -29,6 +29,9 @@ object FlinkRuleSets { */ val DATASET_OPT_RULES: RuleSet = RuleSets.ofList( + // convert a logical table scan to a relational expression + TableScanRule.INSTANCE, + // push a filter into a join FilterJoinRule.FILTER_ON_JOIN, // push filter into the children of a join http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala index 2865d9f..3cdaca3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetScanRule.scala @@ -18,12 +18,13 @@ package org.apache.flink.api.table.plan.rules.dataSet -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} +import org.apache.flink.api.table.plan.schema.DataSetTable class DataSetScanRule extends ConverterRule( @@ -32,6 +33,21 @@ class DataSetScanRule DataSetConvention.INSTANCE, "FlinkScanRule") { + + /** + * If the input is not a DataSetTable, we want the TableScanRule to match instead + */ + override def matches(call: RelOptRuleCall): Boolean = { + val scan: TableScan = call.rel(0).asInstanceOf[TableScan] + val dataSetTable = scan.getTable.unwrap(classOf[DataSetTable[Any]]) + dataSetTable match { + case _: DataSetTable[Any] => + true + case _ => + false + } + } + def convert(rel: RelNode): RelNode = { val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala new file mode 100644 index 0000000..d9a8cce --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.TranslatableTable + +/** + * A [[org.apache.calcite.schema.Table]] implementation for registering + * Table API Tables in the Calcite schema to be used by Flink SQL. + * It implements [[TranslatableTable]] so that its logical scan + * can be converted to a relational expression. + * + * @see [[DataSetTable]] + */ +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { + relNode + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 0e480e8..53c3b4a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -73,6 +73,8 @@ class Table( extends BaseTable(relNode, relBuilder) { + def getRelNode(): RelNode = relNode + /** * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions * can contain complex expressions and aggregations. http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java new file mode 100644 index 0000000..959fb90 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/RegisterDataSetITCase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.TableException; +import org.apache.flink.api.table.plan.TranslationContext; +import org.apache.flink.api.table.test.utils.TableProgramsTestBase; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class RegisterDataSetITCase extends TableProgramsTestBase { + + public RegisterDataSetITCase(TestExecutionMode mode, TableConfigMode configMode) { + super(mode, configMode); + } + + @Test + public void testSimpleRegister() throws Exception { + final String tableName = "MyTable"; + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + TranslationContext.reset(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + tableEnv.registerDataSet(tableName, ds); + Table t = tableEnv.scan(tableName); + + Table result = t.select("f0, f1"); + + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; + compareResultAsText(results, expected); + } + + @Test + public void testRegisterWithFields() throws Exception { + final String tableName = "MyTable"; + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + TranslationContext.reset(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + tableEnv.registerDataSet(tableName, ds, "a, b, c"); + Table t = tableEnv.scan(tableName); + + Table result = t.select("a, b, c"); + + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + } + + @Test(expected = TableException.class) + public void testRegisterExistingDatasetTable() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + TranslationContext.reset(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + tableEnv.registerDataSet("MyTable", ds); + DataSet> ds2 = + CollectionDataSets.getSmall5TupleDataSet(env); + tableEnv.registerDataSet("MyTable", ds2); + } + + @Test(expected = TableException.class) + public void testScanUnregisteredTable() throws Exception { + TableEnvironment tableEnv = getJavaTableEnvironment(); + TranslationContext.reset(); + + tableEnv.scan("nonRegisteredTable"); + } + + @Test + public void testTableRegister() throws Exception { + final String tableName = "MyTable"; + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + TranslationContext.reset(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + Table t = tableEnv.fromDataSet(ds); + tableEnv.registerTable(tableName, t); + Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7"); + + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; + compareResultAsText(results, expected); + } + + @Test(expected = TableException.class) + public void testIllegalName() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + TranslationContext.reset(); + + DataSet> ds = CollectionDataSets.get3TupleDataSet(env); + Table t = tableEnv.fromDataSet(ds); + tableEnv.registerTable("_DataSetTable_42", t); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala new file mode 100644 index 0000000..535c064 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{TableException, Row} +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class RegisterDataSetITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testSimpleRegister(): Unit = { + + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet(tableName, ds) + val t = tEnv.scan(tableName).select('_1, '_2, '_3) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = t.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testRegisterWithFields(): Unit = { + + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c) + val t = tEnv.scan(tableName).select('a, 'b) + + val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + + "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + + "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + val results = t.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[TableException]) + def testRegisterExistingDataSet(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val ds1 = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds1) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds2) + } + + @Test(expected = classOf[TableException]) + def testScanUnregisteredTable(): Unit = { + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + tEnv.scan("someTable") + } + + @Test + def testTableRegister(): Unit = { + + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + tEnv.registerTable(tableName, t) + + val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8) + + val expected = "9,4\n" + "10,4\n" + + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + + "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + + "19,6\n" + "20,6\n" + "21,6\n" + + val results = regT.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[TableException]) + def testRegisterExistingTable(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val t1 = CollectionDataSets.get3TupleDataSet(env).toTable + tEnv.registerTable("MyTable", t1) + val t2 = CollectionDataSets.get5TupleDataSet(env).toTable + tEnv.registerDataSet("MyTable", t2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala index d05ac0d..a52bbbd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala @@ -49,7 +49,7 @@ object ExpressionEvaluator { // create DataSetTable val dataSetMock = mock(classOf[DataSet[Any]]) when(dataSetMock.getType).thenReturn(typeInfo) - val tableName = TranslationContext.addDataSet(new DataSetTable[Any]( + val tableName = TranslationContext.registerDataSetTable(new DataSetTable[Any]( dataSetMock, (0 until typeInfo.getArity).toArray, (0 until typeInfo.getArity).map("f" + _).toArray)) http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out index 1d0198d..b3786d9 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out @@ -1,6 +1,6 @@ == Abstract Syntax Tree == LogicalFilter(condition=[=(MOD($0, 2), 0)]) - LogicalTableScan(table=[[DataSetTable_0]]) + LogicalTableScan(table=[[_DataSetTable_0]]) == Physical Execution Plan == Stage 3 : Data Source http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out index ea76faa..1049466 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out @@ -1,6 +1,6 @@ == Abstract Syntax Tree == LogicalFilter(condition=[=(MOD($0, 2), 0)]) - LogicalTableScan(table=[[DataSetTable_0]]) + LogicalTableScan(table=[[_DataSetTable_0]]) == Physical Execution Plan == Stage 3 : Data Source http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out index 85b815d..5a60862 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out @@ -2,8 +2,8 @@ LogicalProject(a=[$0], c=[$2]) LogicalFilter(condition=[=($1, $3)]) LogicalJoin(condition=[true], joinType=[inner]) - LogicalTableScan(table=[[DataSetTable_0]]) - LogicalTableScan(table=[[DataSetTable_1]]) + LogicalTableScan(table=[[_DataSetTable_0]]) + LogicalTableScan(table=[[_DataSetTable_1]]) == Physical Execution Plan == Stage 4 : Data Source http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out index e88da82..1ca23c7 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out @@ -2,8 +2,8 @@ LogicalProject(a=[$0], c=[$2]) LogicalFilter(condition=[=($1, $3)]) LogicalJoin(condition=[true], joinType=[inner]) - LogicalTableScan(table=[[DataSetTable_0]]) - LogicalTableScan(table=[[DataSetTable_1]]) + LogicalTableScan(table=[[_DataSetTable_0]]) + LogicalTableScan(table=[[_DataSetTable_1]]) == Physical Execution Plan == Stage 4 : Data Source http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out index 8e892c6..d17517f 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out @@ -1,7 +1,7 @@ == Abstract Syntax Tree == LogicalUnion(all=[true]) - LogicalTableScan(table=[[DataSetTable_0]]) - LogicalTableScan(table=[[DataSetTable_1]]) + LogicalTableScan(table=[[_DataSetTable_0]]) + LogicalTableScan(table=[[_DataSetTable_1]]) == Physical Execution Plan == Stage 3 : Data Source http://git-wip-us.apache.org/repos/asf/flink/blob/2da562b4/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out index 34892b1..875f77b 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out @@ -1,7 +1,7 @@ == Abstract Syntax Tree == LogicalUnion(all=[true]) - LogicalTableScan(table=[[DataSetTable_0]]) - LogicalTableScan(table=[[DataSetTable_1]]) + LogicalTableScan(table=[[_DataSetTable_0]]) + LogicalTableScan(table=[[_DataSetTable_1]]) == Physical Execution Plan == Stage 3 : Data Source