Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D03D200BEE for ; Fri, 16 Dec 2016 16:46:40 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6B980160B10; Fri, 16 Dec 2016 15:46:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8643B160B58 for ; Fri, 16 Dec 2016 16:46:34 +0100 (CET) Received: (qmail 78603 invoked by uid 500); 16 Dec 2016 15:46:33 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 77233 invoked by uid 99); 16 Dec 2016 15:46:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Dec 2016 15:46:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5A604F2DED; Fri, 16 Dec 2016 15:46:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 16 Dec 2016 15:47:09 -0000 Message-Id: In-Reply-To: <530cbac4fe6344d0a479ffe65f9dcffb@git.apache.org> References: <530cbac4fe6344d0a479ffe65f9dcffb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [40/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table. archived-at: Fri, 16 Dec 2016 15:46:40 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala deleted file mode 100644 index 6d00ab3..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.calcite.plan.RelOptPlanner.CannotPlanException -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.RelNode -import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.{Programs, RuleSet} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.api.table.explain.PlanJsonParser -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} -import org.apache.flink.api.table.plan.rules.FlinkRuleSets -import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable} -import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink} -import org.apache.flink.api.table.sources.BatchTableSource -import org.apache.flink.types.Row - -/** - * The abstract base class for batch TableEnvironments. - * - * A TableEnvironment can be used to: - * - convert a [[DataSet]] to a [[Table]] - * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog - * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataSet]] - * - explain the AST and execution plan of a [[Table]] - * - * @param execEnv The [[ExecutionEnvironment]] which is wrapped in this [[BatchTableEnvironment]]. - * @param config The [[TableConfig]] of this [[BatchTableEnvironment]]. - */ -abstract class BatchTableEnvironment( - private[flink] val execEnv: ExecutionEnvironment, - config: TableConfig) - extends TableEnvironment(config) { - - // a counter for unique table names. - private val nameCntr: AtomicInteger = new AtomicInteger(0) - - // the naming pattern for internally registered tables. - private val internalNamePattern = "^_DataSetTable_[0-9]+$".r - - /** - * Checks if the chosen table name is valid. - * - * @param name The table name to check. - */ - override protected def checkValidTableName(name: String): Unit = { - val m = internalNamePattern.findFirstIn(name) - m match { - case Some(_) => - throw new TableException(s"Illegal Table name. " + - s"Please choose a name that does not contain the pattern $internalNamePattern") - case None => - } - } - - /** Returns a unique table name according to the internal naming pattern. */ - protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement() - - /** - * Scans a registered table and returns the resulting [[Table]]. - * - * The table to scan must be registered in the [[TableEnvironment]]'s catalog. - * - * @param tableName The name of the table to scan. - * @throws ValidationException if no table is registered under the given name. - * @return The scanned table. - */ - @throws[ValidationException] - def scan(tableName: String): Table = { - if (isRegistered(tableName)) { - new Table(this, CatalogNode(tableName, getRowType(tableName))) - } else { - throw new TableException(s"Table \'$tableName\' was not found in the registry.") - } - } - - /** - * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog. - * Registered tables can be referenced in SQL queries. - * - * @param name The name under which the [[BatchTableSource]] is registered. - * @param tableSource The [[BatchTableSource]] to register. - */ - def registerTableSource(name: String, tableSource: BatchTableSource[_]): Unit = { - - checkValidTableName(name) - registerTableInternal(name, new TableSourceTable(tableSource)) - } - - /** - * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. - * - * All tables referenced by the query must be registered in the TableEnvironment. - * - * @param query The SQL query to evaluate. - * @return The result of the query as Table. - */ - override def sql(query: String): Table = { - - val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) - // parse the sql query - val parsed = planner.parse(query) - // validate the sql query - val validated = planner.validate(parsed) - // transform to a relational tree - val relational = planner.rel(validated) - - new Table(this, LogicalRelNode(relational.rel)) - } - - /** - * Writes a [[Table]] to a [[TableSink]]. - * - * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the - * [[TableSink]] to write it. - * - * @param table The [[Table]] to write. - * @param sink The [[TableSink]] to write the [[Table]] to. - * @tparam T The expected type of the [[DataSet]] which represents the [[Table]]. - */ - override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { - - sink match { - case batchSink: BatchTableSink[T] => - val outputType = sink.getOutputType - // translate the Table into a DataSet and provide the type that the TableSink expects. - val result: DataSet[T] = translate(table)(outputType) - // Give the DataSet to the TableSink to emit it. - batchSink.emitDataSet(result) - case _ => - throw new TableException("BatchTableSink required to emit batch Table") - } - } - - /** - * Returns the AST of the specified Table API and SQL queries and the execution plan to compute - * the result of the given [[Table]]. - * - * @param table The table for which the AST and execution plan will be returned. - * @param extended Flag to include detailed optimizer estimates. - */ - private[flink] def explain(table: Table, extended: Boolean): String = { - val ast = table.getRelNode - val optimizedPlan = optimize(ast) - val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row])) - dataSet.output(new DiscardingOutputFormat[Row]) - val env = dataSet.getExecutionEnvironment - val jasonSqlPlan = env.getExecutionPlan - val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) - - s"== Abstract Syntax Tree ==" + - System.lineSeparator + - s"${RelOptUtil.toString(ast)}" + - System.lineSeparator + - s"== Optimized Logical Plan ==" + - System.lineSeparator + - s"${RelOptUtil.toString(optimizedPlan)}" + - System.lineSeparator + - s"== Physical Execution Plan ==" + - System.lineSeparator + - s"$sqlPlan" - } - - /** - * Returns the AST of the specified Table API and SQL queries and the execution plan to compute - * the result of the given [[Table]]. - * - * @param table The table for which the AST and execution plan will be returned. - */ - def explain(table: Table): String = explain(table: Table, extended = false) - - /** - * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog. - * - * @param name The name under which the table is registered in the catalog. - * @param dataSet The [[DataSet]] to register as table in the catalog. - * @tparam T the type of the [[DataSet]]. - */ - protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = { - - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType) - val dataSetTable = new DataSetTable[T]( - dataSet, - fieldIndexes, - fieldNames - ) - registerTableInternal(name, dataSetTable) - } - - /** - * Registers a [[DataSet]] as a table under a given name with field names as specified by - * field expressions in the [[TableEnvironment]]'s catalog. - * - * @param name The name under which the table is registered in the catalog. - * @param dataSet The [[DataSet]] to register as table in the catalog. - * @param fields The field expressions to define the field names of the table. - * @tparam T The type of the [[DataSet]]. - */ - protected def registerDataSetInternal[T]( - name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = { - - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields) - val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames) - registerTableInternal(name, dataSetTable) - } - - /** - * Returns the built-in rules that are defined by the environment. - */ - protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES - - /** - * Generates the optimized [[RelNode]] tree from the original relational node tree. - * - * @param relNode The original [[RelNode]] tree - * @return The optimized [[RelNode]] tree - */ - private[flink] def optimize(relNode: RelNode): RelNode = { - - // decorrelate - val decorPlan = RelDecorrelator.decorrelateQuery(relNode) - - // optimize the logical Flink plan - val optProgram = Programs.ofRules(getRuleSet) - val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() - - val dataSetPlan = try { - optProgram.run(getPlanner, decorPlan, flinkOutputProps) - } catch { - case e: CannotPlanException => - throw new TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"This exception indicates that the query uses an unsupported SQL feature.\n" + - s"Please check the documentation for the set of currently supported SQL features.") - case t: TableException => - throw new TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"${t.msg}\n" + - s"Please check the documentation for the set of currently supported SQL features.") - case a: AssertionError => - throw a.getCause - } - dataSetPlan - } - - /** - * Translates a [[Table]] into a [[DataSet]]. - * - * The transformation involves optimizing the relational expression tree as defined by - * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators. - * - * @param table The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. - * @tparam A The type of the resulting [[DataSet]]. - * @return The [[DataSet]] that corresponds to the translated [[Table]]. - */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) - } - - /** - * Translates a logical [[RelNode]] into a [[DataSet]]. - * - * @param logicalPlan The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. - * @tparam A The type of the resulting [[DataSet]]. - * @return The [[DataSet]] that corresponds to the translated [[Table]]. - */ - protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = { - validateType(tpe) - - logicalPlan match { - case node: DataSetRel => - node.translateToPlan( - this, - Some(tpe.asInstanceOf[TypeInformation[Any]]) - ).asInstanceOf[DataSet[A]] - case _ => ??? - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala deleted file mode 100644 index 06b3edc..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import org.apache.calcite.plan.RelOptRule -import org.apache.calcite.sql.SqlOperatorTable -import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.sql.util.ChainedSqlOperatorTable -import org.apache.calcite.tools.{RuleSets, RuleSet} -import org.apache.flink.util.Preconditions - -import scala.collection.JavaConverters._ - -/** - * Builder for creating a Calcite configuration. - */ -class CalciteConfigBuilder { - private var replaceRules: Boolean = false - private var ruleSets: List[RuleSet] = Nil - - private var replaceOperatorTable: Boolean = false - private var operatorTables: List[SqlOperatorTable] = Nil - - private var replaceSqlParserConfig: Option[SqlParser.Config] = None - - /** - * Replaces the built-in rule set with the given rule set. - */ - def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = { - Preconditions.checkNotNull(replaceRuleSet) - ruleSets = List(replaceRuleSet) - replaceRules = true - this - } - - /** - * Appends the given rule set to the built-in rule set. - */ - def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = { - Preconditions.checkNotNull(addedRuleSet) - ruleSets = addedRuleSet :: ruleSets - this - } - - /** - * Replaces the built-in SQL operator table with the given table. - */ - def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = { - Preconditions.checkNotNull(replaceSqlOperatorTable) - operatorTables = List(replaceSqlOperatorTable) - replaceOperatorTable = true - this - } - - /** - * Appends the given table to the built-in SQL operator table. - */ - def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = { - Preconditions.checkNotNull(addedSqlOperatorTable) - this.operatorTables = addedSqlOperatorTable :: this.operatorTables - this - } - - /** - * Replaces the built-in SQL parser configuration with the given configuration. - */ - def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = { - Preconditions.checkNotNull(sqlParserConfig) - replaceSqlParserConfig = Some(sqlParserConfig) - this - } - - private class CalciteConfigImpl( - val getRuleSet: Option[RuleSet], - val replacesRuleSet: Boolean, - val getSqlOperatorTable: Option[SqlOperatorTable], - val replacesSqlOperatorTable: Boolean, - val getSqlParserConfig: Option[SqlParser.Config]) - extends CalciteConfig - - /** - * Builds a new [[CalciteConfig]]. - */ - def build(): CalciteConfig = new CalciteConfigImpl( - ruleSets match { - case Nil => None - case h :: Nil => Some(h) - case _ => - // concat rule sets - val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c) - Some(RuleSets.ofList(concatRules.asJava)) - }, - this.replaceRules, - operatorTables match { - case Nil => None - case h :: Nil => Some(h) - case _ => - // chain operator tables - Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y))) - }, - this.replaceOperatorTable, - replaceSqlParserConfig) -} - -/** - * Calcite configuration for defining a custom Calcite configuration for Table and SQL API. - */ -trait CalciteConfig { - /** - * Returns whether this configuration replaces the built-in rule set. - */ - def replacesRuleSet: Boolean - - /** - * Returns a custom rule set. - */ - def getRuleSet: Option[RuleSet] - - /** - * Returns whether this configuration replaces the built-in SQL operator table. - */ - def replacesSqlOperatorTable: Boolean - - /** - * Returns a custom SQL operator table. - */ - def getSqlOperatorTable: Option[SqlOperatorTable] - - /** - * Returns a custom SQL parser configuration. - */ - def getSqlParserConfig: Option[SqlParser.Config] -} - -object CalciteConfig { - - val DEFAULT = createBuilder().build() - - /** - * Creates a new builder for constructing a [[CalciteConfig]]. - */ - def createBuilder(): CalciteConfigBuilder = { - new CalciteConfigBuilder - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala deleted file mode 100644 index b1ccc09..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import org.apache.calcite.adapter.java.JavaTypeFactory -import org.apache.calcite.prepare.CalciteCatalogReader -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable} -import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance} - -/** - * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]]. - */ -class FlinkCalciteSqlValidator( - opTab: SqlOperatorTable, - catalogReader: CalciteCatalogReader, - typeFactory: JavaTypeFactory) extends SqlValidatorImpl( - opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) { - - override def getLogicalSourceRowType( - sourceRowType: RelDataType, - insert: SqlInsert): RelDataType = { - typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType) - } - - override def getLogicalTargetRowType( - targetRowType: RelDataType, - insert: SqlInsert): RelDataType = { - typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala deleted file mode 100644 index 131cdc6..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import java.util - -import com.google.common.collect.ImmutableList -import org.apache.calcite.jdbc.CalciteSchema -import org.apache.calcite.plan.RelOptTable.ViewExpander -import org.apache.calcite.plan._ -import org.apache.calcite.prepare.CalciteCatalogReader -import org.apache.calcite.rel.RelRoot -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex.RexBuilder -import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.sql.parser.{SqlParseException => CSqlParseException, SqlParser} -import org.apache.calcite.sql.validate.SqlValidator -import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} -import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter} -import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} - -import scala.collection.JavaConversions._ - -/** - * NOTE: this is heavily inspired by Calcite's PlannerImpl. - * We need it in order to share the planner between the Table API relational plans - * and the SQL relation plans that are created by the Calcite parser. - * The main difference is that we do not create a new RelOptPlanner in the ready() method. - */ -class FlinkPlannerImpl( - config: FrameworkConfig, - planner: RelOptPlanner, - typeFactory: FlinkTypeFactory) { - - val operatorTable: SqlOperatorTable = config.getOperatorTable - /** Holds the trait definitions to be registered with planner. May be null. */ - val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs - val parserConfig: SqlParser.Config = config.getParserConfig - val convertletTable: SqlRexConvertletTable = config.getConvertletTable - val defaultSchema: SchemaPlus = config.getDefaultSchema - - var validator: FlinkCalciteSqlValidator = _ - var validatedSqlNode: SqlNode = _ - var root: RelRoot = _ - - private def ready() { - if (this.traitDefs != null) { - planner.clearRelTraitDefs() - for (traitDef <- this.traitDefs) { - planner.addRelTraitDef(traitDef) - } - } - } - - def parse(sql: String): SqlNode = { - try { - ready() - val parser: SqlParser = SqlParser.create(sql, parserConfig) - val sqlNode: SqlNode = parser.parseStmt - sqlNode - } catch { - case e: CSqlParseException => - throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e) - } - } - - def validate(sqlNode: SqlNode): SqlNode = { - validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory) - validator.setIdentifierExpansion(true) - try { - validatedSqlNode = validator.validate(sqlNode) - } - catch { - case e: RuntimeException => - throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e) - } - validatedSqlNode - } - - def rel(sql: SqlNode): RelRoot = { - try { - assert(validatedSqlNode != null) - val rexBuilder: RexBuilder = createRexBuilder - val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) - val config = SqlToRelConverter.configBuilder() - .withTrimUnusedFields(false).withConvertTableAccess(false).build() - val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( - new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config) - root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true) - // we disable automatic flattening in order to let composite types pass without modification - // we might enable it again once Calcite has better support for structured types - // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) - root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) - root - } catch { - case e: RelConversionException => throw TableException(e.getMessage) - } - } - - /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]] - * interface for [[org.apache.calcite.tools.Planner]]. */ - class ViewExpanderImpl extends ViewExpander { - - override def expandView( - rowType: RelDataType, - queryString: String, - schemaPath: util.List[String], - viewPath: util.List[String]): RelRoot = { - - val parser: SqlParser = SqlParser.create(queryString, parserConfig) - var sqlNode: SqlNode = null - try { - sqlNode = parser.parseQuery - } - catch { - case e: CSqlParseException => - throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e) - } - val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath) - val validator: SqlValidator = - new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory) - validator.setIdentifierExpansion(true) - val validatedSqlNode: SqlNode = validator.validate(sqlNode) - val rexBuilder: RexBuilder = createRexBuilder - val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) - val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder - .withTrimUnusedFields(false).withConvertTableAccess(false).build - val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( - new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config) - root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false) - root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) - root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) - FlinkPlannerImpl.this.root - } - } - - private def createCatalogReader: CalciteCatalogReader = { - val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema) - new CalciteCatalogReader( - CalciteSchema.from(rootSchema), - parserConfig.caseSensitive, - CalciteSchema.from(defaultSchema).path(null), - typeFactory) - } - - private def createRexBuilder: RexBuilder = { - new RexBuilder(typeFactory) - } - -} - -object FlinkPlannerImpl { - private def rootSchema(schema: SchemaPlus): SchemaPlus = { - if (schema.getParentSchema == null) { - schema - } - else { - rootSchema(schema.getParentSchema) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala deleted file mode 100644 index 8508e53..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import java.util.Collections - -import org.apache.calcite.plan.volcano.VolcanoPlanner -import java.lang.Iterable - -import org.apache.calcite.jdbc.CalciteSchema -import org.apache.calcite.plan._ -import org.apache.calcite.prepare.CalciteCatalogReader -import org.apache.calcite.rel.logical.LogicalAggregate -import org.apache.calcite.rex.RexBuilder -import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} -import org.apache.calcite.tools.{FrameworkConfig, RelBuilder} -import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.api.table.expressions.WindowProperty -import org.apache.flink.api.table.plan.logical.LogicalWindow -import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate - -/** - * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. - */ -class FlinkRelBuilder( - context: Context, - relOptCluster: RelOptCluster, - relOptSchema: RelOptSchema) - extends RelBuilder( - context, - relOptCluster, - relOptSchema) { - - def getPlanner: RelOptPlanner = cluster.getPlanner - - def getCluster: RelOptCluster = relOptCluster - - override def getTypeFactory: FlinkTypeFactory = - super.getTypeFactory.asInstanceOf[FlinkTypeFactory] - - def aggregate( - window: LogicalWindow, - groupKey: GroupKey, - namedProperties: Seq[NamedWindowProperty], - aggCalls: Iterable[AggCall]) - : RelBuilder = { - // build logical aggregate - val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate] - - // build logical window aggregate from it - push(LogicalWindowAggregate.create(window, namedProperties, aggregate)) - this - } - -} - -object FlinkRelBuilder { - - def create(config: FrameworkConfig): FlinkRelBuilder = { - - // create Flink type factory - val typeSystem = config.getTypeSystem - val typeFactory = new FlinkTypeFactory(typeSystem) - - // create context instances with Flink type factory - val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty()) - planner.setExecutor(config.getExecutor) - planner.addRelTraitDef(ConventionTraitDef.INSTANCE) - val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) - val calciteSchema = CalciteSchema.from(config.getDefaultSchema) - val relOptSchema = new CalciteCatalogReader( - calciteSchema, - config.getParserConfig.caseSensitive(), - Collections.emptyList(), - typeFactory) - - new FlinkRelBuilder(config.getContext, cluster, relOptSchema) - } - - /** - * Information necessary to create a window aggregate. - * - * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]]. - */ - case class NamedWindowProperty(name: String, property: WindowProperty) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala deleted file mode 100644 index 8dcd660..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import org.apache.calcite.avatica.util.TimeUnit -import org.apache.calcite.jdbc.JavaTypeFactoryImpl -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} -import org.apache.calcite.sql.SqlIntervalQualifier -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.parser.SqlParserPos -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo -import org.apache.flink.api.java.typeutils.ValueTypeInfo._ -import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName -import org.apache.flink.api.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType} -import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo -import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple - -import scala.collection.mutable - -/** - * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] - * and Calcite's [[RelDataType]]. - */ -class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { - - // NOTE: for future data types it might be necessary to - // override more methods of RelDataTypeFactoryImpl - - private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]() - - def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = { - // simple type can be converted to SQL types and vice versa - if (isSimple(typeInfo)) { - val sqlType = typeInfoToSqlTypeName(typeInfo) - sqlType match { - - case INTERVAL_YEAR_MONTH => - createSqlIntervalType( - new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)) - - case INTERVAL_DAY_SECOND => - createSqlIntervalType( - new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) - - case _ => - createSqlType(sqlType) - } - } - // advanced types require specific RelDataType - // for storing the original TypeInformation - else { - seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo))) - } - } - - /** - * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory - * - * @param fieldNames field names - * @param fieldTypes field types, every element is Flink's [[TypeInformation]] - * @return a struct type with the input fieldNames and input fieldTypes - */ - def buildRowDataType( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]) - : RelDataType = { - val rowDataTypeBuilder = builder - fieldNames - .zip(fieldTypes) - .foreach { f => - rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true) - } - rowDataTypeBuilder.build - } - - override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = { - // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue - // always set those to default value - if (typeName == VARCHAR && precision < 0) { - createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName)) - } else { - super.createSqlType(typeName, precision) - } - } - - override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = - new ArrayRelDataType( - ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)), - elementType, - true) - - private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { - case ct: CompositeType[_] => - new CompositeRelDataType(ct, this) - - case pa: PrimitiveArrayTypeInfo[_] => - new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false) - - case oa: ObjectArrayTypeInfo[_, _] => - new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true) - - case ti: TypeInformation[_] => - new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) - - case ti@_ => - throw TableException(s"Unsupported type information: $ti") - } - - override def createTypeWithNullability( - relDataType: RelDataType, - nullable: Boolean) - : RelDataType = relDataType match { - case composite: CompositeRelDataType => - // at the moment we do not care about nullability - composite - case _ => - super.createTypeWithNullability(relDataType, nullable) - } -} - -object FlinkTypeFactory { - - private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { - case BOOLEAN_TYPE_INFO => BOOLEAN - case BYTE_TYPE_INFO => TINYINT - case SHORT_TYPE_INFO => SMALLINT - case INT_TYPE_INFO => INTEGER - case LONG_TYPE_INFO => BIGINT - case FLOAT_TYPE_INFO => FLOAT - case DOUBLE_TYPE_INFO => DOUBLE - case STRING_TYPE_INFO => VARCHAR - case BIG_DEC_TYPE_INFO => DECIMAL - - // temporal types - case SqlTimeTypeInfo.DATE => DATE - case SqlTimeTypeInfo.TIME => TIME - case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP - case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH - case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND - - case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => - throw TableException("Character type is not supported.") - - case _@t => - throw TableException(s"Type is not supported: $t") - } - - def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { - case BOOLEAN => BOOLEAN_TYPE_INFO - case TINYINT => BYTE_TYPE_INFO - case SMALLINT => SHORT_TYPE_INFO - case INTEGER => INT_TYPE_INFO - case BIGINT => LONG_TYPE_INFO - case FLOAT => FLOAT_TYPE_INFO - case DOUBLE => DOUBLE_TYPE_INFO - case VARCHAR | CHAR => STRING_TYPE_INFO - case DECIMAL => BIG_DEC_TYPE_INFO - - // temporal types - case DATE => SqlTimeTypeInfo.DATE - case TIME => SqlTimeTypeInfo.TIME - case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP - case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS - case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS - - case NULL => - throw TableException("Type NULL is not supported. Null values must have a supported type.") - - // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING - // are represented as integer - case SYMBOL => INT_TYPE_INFO - - // extract encapsulated TypeInformation - case ANY if relDataType.isInstanceOf[GenericRelDataType] => - val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] - genericRelDataType.typeInfo - - case ROW if relDataType.isInstanceOf[CompositeRelDataType] => - val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType] - compositeRelDataType.compositeType - - // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder - case ROW | CURSOR => new NothingTypeInfo - - case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => - val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] - arrayRelDataType.typeInfo - - case _@t => - throw TableException(s"Type is not supported: $t") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala deleted file mode 100644 index 3222eee..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl -import org.apache.calcite.sql.`type`.SqlTypeName - -/** - * Custom type system for Flink. - */ -class FlinkTypeSystem extends RelDataTypeSystemImpl { - - // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic - // half should be enough for all use cases - override def getMaxNumericScale: Int = Int.MaxValue / 2 - - // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic - // half should be enough for all use cases - override def getMaxNumericPrecision: Int = Int.MaxValue / 2 - - override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match { - - // by default all VARCHARs can have the Java default length - case SqlTypeName.VARCHAR => - Int.MaxValue - - // we currenty support only timestamps with milliseconds precision - case SqlTypeName.TIMESTAMP => - 3 - - case _ => - super.getDefaultPrecision(typeName) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala deleted file mode 100644 index da20e07..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.calcite.plan.RelOptPlanner.CannotPlanException -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.RelNode -import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.{Programs, RuleSet} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.table.explain.PlanJsonParser -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} -import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} -import org.apache.flink.api.table.plan.rules.FlinkRuleSets -import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable} -import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink} -import org.apache.flink.api.table.sources.StreamTableSource -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.types.Row - -/** - * The base class for stream TableEnvironments. - * - * A TableEnvironment can be used to: - * - convert [[DataStream]] to a [[Table]] - * - register a [[DataStream]] as a table in the catalog - * - register a [[Table]] in the catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataStream]] - * - * @param execEnv The [[StreamExecutionEnvironment]] which is wrapped in this - * [[StreamTableEnvironment]]. - * @param config The [[TableConfig]] of this [[StreamTableEnvironment]]. - */ -abstract class StreamTableEnvironment( - private[flink] val execEnv: StreamExecutionEnvironment, - config: TableConfig) - extends TableEnvironment(config) { - - // a counter for unique table names - private val nameCntr: AtomicInteger = new AtomicInteger(0) - - // the naming pattern for internally registered tables. - private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r - - /** - * Checks if the chosen table name is valid. - * - * @param name The table name to check. - */ - override protected def checkValidTableName(name: String): Unit = { - val m = internalNamePattern.findFirstIn(name) - m match { - case Some(_) => - throw new TableException(s"Illegal Table name. " + - s"Please choose a name that does not contain the pattern $internalNamePattern") - case None => - } - } - - /** Returns a unique table name according to the internal naming pattern. */ - protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement() - - /** - * Returns field names and field positions for a given [[TypeInformation]]. - * - * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * - * @param inputType The TypeInformation extract the field names and positions from. - * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. - */ - override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]) - : (Array[String], Array[Int]) = { - val fieldInfo = super.getFieldInfo(inputType) - if (fieldInfo._1.contains("rowtime")) { - throw new TableException("'rowtime' ia a reserved field name in stream environment.") - } - fieldInfo - } - - /** - * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of - * [[Expression]]. - * - * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. - * @param exprs The expressions that define the field names. - * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. - */ - override protected[flink] def getFieldInfo[A]( - inputType: TypeInformation[A], - exprs: Array[Expression]) - : (Array[String], Array[Int]) = { - val fieldInfo = super.getFieldInfo(inputType, exprs) - if (fieldInfo._1.contains("rowtime")) { - throw new TableException("'rowtime' is a reserved field name in stream environment.") - } - fieldInfo - } - - /** - * Ingests a registered table and returns the resulting [[Table]]. - * - * The table to ingest must be registered in the [[TableEnvironment]]'s catalog. - * - * @param tableName The name of the table to ingest. - * @throws ValidationException if no table is registered under the given name. - * @return The ingested table. - */ - @throws[ValidationException] - def ingest(tableName: String): Table = { - - if (isRegistered(tableName)) { - new Table(this, CatalogNode(tableName, getRowType(tableName))) - } - else { - throw new ValidationException(s"Table \'$tableName\' was not found in the registry.") - } - } - - /** - * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog. - * Registered tables can be referenced in SQL queries. - * - * @param name The name under which the [[StreamTableSource]] is registered. - * @param tableSource The [[org.apache.flink.api.table.sources.StreamTableSource]] to register. - */ - def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = { - - checkValidTableName(name) - registerTableInternal(name, new TableSourceTable(tableSource)) - } - - /** - * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. - * - * All tables referenced by the query must be registered in the TableEnvironment. - * - * @param query The SQL query to evaluate. - * @return The result of the query as Table. - */ - override def sql(query: String): Table = { - - val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) - // parse the sql query - val parsed = planner.parse(query) - // validate the sql query - val validated = planner.validate(parsed) - // transform to a relational tree - val relational = planner.rel(validated) - - new Table(this, LogicalRelNode(relational.rel)) - } - - /** - * Writes a [[Table]] to a [[TableSink]]. - * - * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the - * [[TableSink]] to write it. - * - * @param table The [[Table]] to write. - * @param sink The [[TableSink]] to write the [[Table]] to. - * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. - */ - override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { - - sink match { - case streamSink: StreamTableSink[T] => - val outputType = sink.getOutputType - // translate the Table into a DataStream and provide the type that the TableSink expects. - val result: DataStream[T] = translate(table)(outputType) - // Give the DataSet to the TableSink to emit it. - streamSink.emitDataStream(result) - case _ => - throw new TableException("StreamTableSink required to emit streaming Table") - } - } - - /** - * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s - * catalog. - * - * @param name The name under which the table is registered in the catalog. - * @param dataStream The [[DataStream]] to register as table in the catalog. - * @tparam T the type of the [[DataStream]]. - */ - protected def registerDataStreamInternal[T]( - name: String, - dataStream: DataStream[T]): Unit = { - - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType) - val dataStreamTable = new DataStreamTable[T]( - dataStream, - fieldIndexes, - fieldNames - ) - registerTableInternal(name, dataStreamTable) - } - - /** - * Registers a [[DataStream]] as a table under a given name with field names as specified by - * field expressions in the [[TableEnvironment]]'s catalog. - * - * @param name The name under which the table is registered in the catalog. - * @param dataStream The [[DataStream]] to register as table in the catalog. - * @param fields The field expressions to define the field names of the table. - * @tparam T The type of the [[DataStream]]. - */ - protected def registerDataStreamInternal[T]( - name: String, - dataStream: DataStream[T], - fields: Array[Expression]): Unit = { - - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray) - val dataStreamTable = new DataStreamTable[T]( - dataStream, - fieldIndexes.toArray, - fieldNames.toArray - ) - registerTableInternal(name, dataStreamTable) - } - - /** - * Returns the built-in rules that are defined by the environment. - */ - protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES - - /** - * Generates the optimized [[RelNode]] tree from the original relational node tree. - * - * @param relNode The root node of the relational expression tree. - * @return The optimized [[RelNode]] tree - */ - private[flink] def optimize(relNode: RelNode): RelNode = { - // decorrelate - val decorPlan = RelDecorrelator.decorrelateQuery(relNode) - - // optimize the logical Flink plan - val optProgram = Programs.ofRules(getRuleSet) - val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify() - - val dataStreamPlan = try { - optProgram.run(getPlanner, decorPlan, flinkOutputProps) - } - catch { - case e: CannotPlanException => - throw TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"This exception indicates that the query uses an unsupported SQL feature.\n" + - s"Please check the documentation for the set of currently supported SQL features.", e) - } - dataStreamPlan - } - - - /** - * Translates a [[Table]] into a [[DataStream]]. - * - * The transformation involves optimizing the relational expression tree as defined by - * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. - * - * @param table The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. - */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { - val dataStreamPlan = optimize(table.getRelNode) - translate(dataStreamPlan) - } - - /** - * Translates a logical [[RelNode]] into a [[DataStream]]. - * - * @param logicalPlan The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. - */ - protected def translate[A] - (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { - - validateType(tpe) - - logicalPlan match { - case node: DataStreamRel => - node.translateToPlan( - this, - Some(tpe.asInstanceOf[TypeInformation[Any]]) - ).asInstanceOf[DataStream[A]] - case _ => ??? - } - } - - /** - * Returns the AST of the specified Table API and SQL queries and the execution plan to compute - * the result of the given [[Table]]. - * - * @param table The table for which the AST and execution plan will be returned. - */ - def explain(table: Table): String = { - val ast = table.getRelNode - val optimizedPlan = optimize(ast) - val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) - - val env = dataStream.getExecutionEnvironment - val jsonSqlPlan = env.getExecutionPlan - - val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false) - - s"== Abstract Syntax Tree ==" + - System.lineSeparator + - s"${RelOptUtil.toString(ast)}" + - System.lineSeparator + - s"== Optimized Logical Plan ==" + - System.lineSeparator + - s"${RelOptUtil.toString(optimizedPlan)}" + - System.lineSeparator + - s"== Physical Execution Plan ==" + - System.lineSeparator + - s"$sqlPlan" - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala deleted file mode 100644 index 37d9cb5..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import java.util.TimeZone - -/** - * A config to define the runtime behavior of the Table API. - */ -class TableConfig { - - /** - * Defines the timezone for date/time/timestamp conversions. - */ - private var timeZone: TimeZone = TimeZone.getTimeZone("UTC") - - /** - * Defines if all fields need to be checked for NULL first. - */ - private var nullCheck: Boolean = true - - /** - * Defines if efficient types (such as Tuple types or Atomic types) - * should be used within operators where possible. - */ - private var efficientTypeUsage = false - - /** - * Defines the configuration of Calcite for Table API and SQL queries. - */ - private var calciteConfig = CalciteConfig.DEFAULT - - /** - * Sets the timezone for date/time/timestamp conversions. - */ - def setTimeZone(timeZone: TimeZone): Unit = { - require(timeZone != null, "timeZone must not be null.") - this.timeZone = timeZone - } - - /** - * Returns the timezone for date/time/timestamp conversions. - */ - def getTimeZone = timeZone - - /** - * Returns the NULL check. If enabled, all fields need to be checked for NULL first. - */ - def getNullCheck = nullCheck - - /** - * Sets the NULL check. If enabled, all fields need to be checked for NULL first. - */ - def setNullCheck(nullCheck: Boolean): Unit = { - this.nullCheck = nullCheck - } - - /** - * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types - * or Atomic types) are used within operators where possible. - * - * NOTE: Currently, this is an experimental feature. - */ - def getEfficientTypeUsage = efficientTypeUsage - - /** - * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types - * or Atomic types) are used within operators where possible. - * - * NOTE: Currently, this is an experimental feature. - */ - def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = { - this.efficientTypeUsage = efficientTypeUsage - } - - /** - * Returns the current configuration of Calcite for Table API and SQL queries. - */ - def getCalciteConfig: CalciteConfig = calciteConfig - - /** - * Sets the configuration of Calcite for Table API and SQL queries. - * Changing the configuration has no effect after the first query has been defined. - */ - def setCalciteConfig(calciteConfig: CalciteConfig): Unit = { - this.calciteConfig = calciteConfig - } -} - -object TableConfig { - def DEFAULT = new TableConfig() -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala deleted file mode 100644 index 07ea860..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ /dev/null @@ -1,537 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table - -import java.lang.reflect.Modifier -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.calcite.config.Lex -import org.apache.calcite.plan.RelOptPlanner -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.sql.SqlOperatorTable -import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.sql.util.ChainedSqlOperatorTable -import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} -import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} -import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} -import org.apache.flink.api.table.codegen.ExpressionReducer -import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference} -import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} -import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction} -import org.apache.flink.api.table.plan.cost.DataSetCostFactory -import org.apache.flink.api.table.plan.schema.RelTable -import org.apache.flink.api.table.sinks.TableSink -import org.apache.flink.api.table.validate.FunctionCatalog -import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} -import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} - -import scala.collection.JavaConverters._ - -/** - * The abstract base class for batch and stream TableEnvironments. - * - * @param config The configuration of the TableEnvironment - */ -abstract class TableEnvironment(val config: TableConfig) { - - // the catalog to hold all registered and translated tables - private val tables: SchemaPlus = Frameworks.createRootSchema(true) - - // Table API/SQL function catalog - private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns - - // the configuration to create a Calcite planner - private lazy val frameworkConfig: FrameworkConfig = Frameworks - .newConfigBuilder - .defaultSchema(tables) - .parserConfig(getSqlParserConfig) - .costFactory(new DataSetCostFactory) - .typeSystem(new FlinkTypeSystem) - .operatorTable(getSqlOperatorTable) - // set the executor to evaluate constant expressions - .executor(new ExpressionReducer(config)) - .build - - // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. - protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig) - - // the planner instance used to optimize queries of this TableEnvironment - private lazy val planner: RelOptPlanner = relBuilder.getPlanner - - private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory - - // a counter for unique attribute names - private val attrNameCntr: AtomicInteger = new AtomicInteger(0) - - /** Returns the table config to define the runtime behavior of the Table API. */ - def getConfig = config - - /** - * Returns the operator table for this environment including a custom Calcite configuration. - */ - protected def getSqlOperatorTable: SqlOperatorTable = { - val calciteConfig = config.getCalciteConfig - calciteConfig.getSqlOperatorTable match { - - case None => - functionCatalog.getSqlOperatorTable - - case Some(table) => - if (calciteConfig.replacesSqlOperatorTable) { - table - } else { - ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table) - } - } - } - - /** - * Returns the rule set for this environment including a custom Calcite configuration. - */ - protected def getRuleSet: RuleSet = { - val calciteConfig = config.getCalciteConfig - calciteConfig.getRuleSet match { - - case None => - getBuiltInRuleSet - - case Some(ruleSet) => - if (calciteConfig.replacesRuleSet) { - ruleSet - } else { - RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava) - } - } - } - - /** - * Returns the SQL parser config for this environment including a custom Calcite configuration. - */ - protected def getSqlParserConfig: SqlParser.Config = { - val calciteConfig = config.getCalciteConfig - calciteConfig.getSqlParserConfig match { - - case None => - // we use Java lex because back ticks are easier than double quotes in programming - // and cases are preserved - SqlParser - .configBuilder() - .setLex(Lex.JAVA) - .build() - - case Some(sqlParserConfig) => - sqlParserConfig - } - } - - /** - * Returns the built-in rules that are defined by the environment. - */ - protected def getBuiltInRuleSet: RuleSet - - /** - * Registers a [[ScalarFunction]] under a unique name. Replaces already existing - * user-defined functions under this name. - */ - def registerFunction(name: String, function: ScalarFunction): Unit = { - // check if class could be instantiated - checkForInstantiation(function.getClass) - - // register in Table API - functionCatalog.registerFunction(name, function.getClass) - - // register in SQL API - functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory)) - } - - /** - * Registers a [[TableFunction]] under a unique name. Replaces already existing - * user-defined functions under this name. - */ - private[flink] def registerTableFunctionInternal[T: TypeInformation]( - name: String, function: TableFunction[T]): Unit = { - // check if class not Scala object - checkNotSingleton(function.getClass) - // check if class could be instantiated - checkForInstantiation(function.getClass) - - val typeInfo: TypeInformation[_] = if (function.getResultType != null) { - function.getResultType - } else { - implicitly[TypeInformation[T]] - } - - // register in Table API - functionCatalog.registerFunction(name, function.getClass) - - // register in SQL API - val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory) - functionCatalog.registerSqlFunctions(sqlFunctions) - } - - /** - * Registers a [[Table]] under a unique name in the TableEnvironment's catalog. - * Registered tables can be referenced in SQL queries. - * - * @param name The name under which the table is registered. - * @param table The table to register. - */ - def registerTable(name: String, table: Table): Unit = { - - // check that table belongs to this table environment - if (table.tableEnv != this) { - throw new TableException( - "Only tables that belong to this TableEnvironment can be registered.") - } - - checkValidTableName(name) - val tableTable = new RelTable(table.getRelNode) - registerTableInternal(name, tableTable) - } - - /** - * Replaces a registered Table with another Table under the same name. - * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]] - * with a [[org.apache.calcite.schema.TranslatableTable]]. - * - * @param name Name of the table to replace. - * @param table The table that replaces the previous table. - */ - protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = { - - if (isRegistered(name)) { - tables.add(name, table) - } else { - throw new TableException(s"Table \'$name\' is not registered.") - } - } - - /** - * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. - * - * All tables referenced by the query must be registered in the TableEnvironment. - * - * @param query The SQL query to evaluate. - * @return The result of the query as Table. - */ - def sql(query: String): Table - - /** - * Writes a [[Table]] to a [[TableSink]]. - * - * @param table The [[Table]] to write. - * @param sink The [[TableSink]] to write the [[Table]] to. - * @tparam T The data type that the [[TableSink]] expects. - */ - private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit - - /** - * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. - * - * @param name The name under which the table is registered. - * @param table The table to register in the catalog - * @throws TableException if another table is registered under the provided name. - */ - @throws[TableException] - protected def registerTableInternal(name: String, table: AbstractTable): Unit = { - - if (isRegistered(name)) { - throw new TableException(s"Table \'$name\' already exists. " + - s"Please, choose a different name.") - } else { - tables.add(name, table) - } - } - - /** - * Checks if the chosen table name is valid. - * - * @param name The table name to check. - */ - protected def checkValidTableName(name: String): Unit - - /** - * Checks if a table is registered under the given name. - * - * @param name The table name to check. - * @return true, if a table is registered under the name, false otherwise. - */ - protected def isRegistered(name: String): Boolean = { - tables.getTableNames.contains(name) - } - - protected def getRowType(name: String): RelDataType = { - tables.getTable(name).getRowType(typeFactory) - } - - /** Returns a unique temporary attribute name. */ - private[flink] def createUniqueAttributeName(): String = { - "TMP_" + attrNameCntr.getAndIncrement() - } - - /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */ - private[flink] def getRelBuilder: FlinkRelBuilder = { - relBuilder - } - - /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */ - private[flink] def getPlanner: RelOptPlanner = { - planner - } - - /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */ - private[flink] def getTypeFactory: FlinkTypeFactory = { - typeFactory - } - - private[flink] def getFunctionCatalog: FunctionCatalog = { - functionCatalog - } - - /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */ - private[flink] def getFrameworkConfig: FrameworkConfig = { - frameworkConfig - } - - protected def validateType(typeInfo: TypeInformation[_]): Unit = { - val clazz = typeInfo.getTypeClass - if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || - !Modifier.isPublic(clazz.getModifiers) || - clazz.getCanonicalName == null) { - throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + - s"static and globally accessible.") - } - } - - /** - * Returns field names and field positions for a given [[TypeInformation]]. - * - * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * - * @param inputType The TypeInformation extract the field names and positions from. - * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. - */ - protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): - (Array[String], Array[Int]) = - { - validateType(inputType) - - val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - } - val fieldIndexes = fieldNames.indices.toArray - - if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - } - - (fieldNames, fieldIndexes) - } - - /** - * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of - * [[Expression]]. - * - * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. - * @param exprs The expressions that define the field names. - * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. - */ - protected[flink] def getFieldInfo[A]( - inputType: TypeInformation[A], - exprs: Array[Expression]): (Array[String], Array[Int]) = { - - validateType(inputType) - - val indexedNames: Array[(Int, String)] = inputType match { - case a: AtomicType[A] => - if (exprs.length != 1) { - throw new TableException("Table of atomic type can only have a single field.") - } - exprs.map { - case UnresolvedFieldReference(name) => (0, name) - case _ => throw new TableException("Field reference expression expected.") - } - case t: TupleTypeInfo[A] => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Alias(UnresolvedFieldReference(origName), name, _), _) => - val idx = t.getFieldIndex(origName) - if (idx < 0) { - throw new TableException(s"$origName is not a field of type $t") - } - (idx, name) - case _ => throw new TableException( - "Field reference expression or alias on field expression expected.") - } - case c: CaseClassTypeInfo[A] => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) - case (Alias(UnresolvedFieldReference(origName), name, _), _) => - val idx = c.getFieldIndex(origName) - if (idx < 0) { - throw new TableException(s"$origName is not a field of type $c") - } - (idx, name) - case _ => throw new TableException( - "Field reference expression or alias on field expression expected.") - } - case p: PojoTypeInfo[A] => - exprs.map { - case (UnresolvedFieldReference(name)) => - val idx = p.getFieldIndex(name) - if (idx < 0) { - throw new TableException(s"$name is not a field of type $p") - } - (idx, name) - case Alias(UnresolvedFieldReference(origName), name, _) => - val idx = p.getFieldIndex(origName) - if (idx < 0) { - throw new TableException(s"$origName is not a field of type $p") - } - (idx, name) - case _ => throw new TableException( - "Field reference expression or alias on field expression expected.") - } - case tpe => throw new TableException( - s"Source of type $tpe cannot be converted into Table.") - } - - val (fieldIndexes, fieldNames) = indexedNames.unzip - - if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - } - - (fieldNames.toArray, fieldIndexes.toArray) - } - -} - -/** - * Object to instantiate a [[TableEnvironment]] depending on the batch or stream execution - * environment. - */ -object TableEnvironment { - - /** - * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]]. - * - * @param executionEnvironment The Java batch ExecutionEnvironment. - */ - def getTableEnvironment(executionEnvironment: JavaBatchExecEnv): JavaBatchTableEnv = { - new JavaBatchTableEnv(executionEnvironment, new TableConfig()) - } - - /** - * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]] and a given [[TableConfig]]. - * - * @param executionEnvironment The Java batch ExecutionEnvironment. - * @param tableConfig The TableConfig for the new TableEnvironment. - */ - def getTableEnvironment( - executionEnvironment: JavaBatchExecEnv, - tableConfig: TableConfig): JavaBatchTableEnv = { - - new JavaBatchTableEnv(executionEnvironment, tableConfig) - } - - /** - * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]]. - * - * @param executionEnvironment The Scala batch ExecutionEnvironment. - */ - def getTableEnvironment(executionEnvironment: ScalaBatchExecEnv): ScalaBatchTableEnv = { - new ScalaBatchTableEnv(executionEnvironment, new TableConfig()) - } - - /** - * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]] and a given - * [[TableConfig]]. - * - * @param executionEnvironment The Scala batch ExecutionEnvironment. - * @param tableConfig The TableConfig for the new TableEnvironment. - */ - def getTableEnvironment( - executionEnvironment: ScalaBatchExecEnv, - tableConfig: TableConfig): ScalaBatchTableEnv = { - - new ScalaBatchTableEnv(executionEnvironment, tableConfig) - } - - /** - * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]]. - * - * @param executionEnvironment The Java StreamExecutionEnvironment. - */ - def getTableEnvironment(executionEnvironment: JavaStreamExecEnv): JavaStreamTableEnv = { - new JavaStreamTableEnv(executionEnvironment, new TableConfig()) - } - - /** - * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]] and a given [[TableConfig]]. - * - * @param executionEnvironment The Java StreamExecutionEnvironment. - * @param tableConfig The TableConfig for the new TableEnvironment. - */ - def getTableEnvironment( - executionEnvironment: JavaStreamExecEnv, - tableConfig: TableConfig): JavaStreamTableEnv = { - - new JavaStreamTableEnv(executionEnvironment, tableConfig) - } - - /** - * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]]. - * - * @param executionEnvironment The Scala StreamExecutionEnvironment. - */ - def getTableEnvironment(executionEnvironment: ScalaStreamExecEnv): ScalaStreamTableEnv = { - new ScalaStreamTableEnv(executionEnvironment, new TableConfig()) - } - - /** - * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]]. - * - * @param executionEnvironment The Scala StreamExecutionEnvironment. - * @param tableConfig The TableConfig for the new TableEnvironment. - */ - def getTableEnvironment( - executionEnvironment: ScalaStreamExecEnv, - tableConfig: TableConfig): ScalaStreamTableEnv = { - - new ScalaStreamTableEnv(executionEnvironment, tableConfig) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala deleted file mode 100644 index a988152..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} -import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo - -/** - * This class enumerates all supported types of the Table API. - */ -object Types { - - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO - - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO - - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala deleted file mode 100644 index 8b7559f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.codegen - -/** - * Exception for all errors occurring during code generation. - */ -class CodeGenException(msg: String) extends RuntimeException(msg)