flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
Date Fri, 22 Apr 2016 12:07:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253808#comment-15253808
] 

ASF GitHub Bot commented on FLINK-3754:
---------------------------------------

Github user yjshen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1916#discussion_r60728611
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
---
    @@ -0,0 +1,188 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.logical
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.`type`.RelDataTypeFactory
    +import org.apache.calcite.rel.core.JoinRelType
    +import org.apache.calcite.rel.logical.LogicalProject
    +import org.apache.calcite.schema.{Table => CTable}
    +import org.apache.calcite.tools.RelBuilder
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.java.operators.join.JoinType
    +import org.apache.flink.api.table.expressions._
    +import org.apache.flink.api.table.typeutils.TypeConverter
    +import org.apache.flink.api.table.validate.ValidationException
    +
    +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode
{
    +  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    def allAlias: Boolean = {
    +      projectList.forall { proj =>
    +        proj match {
    +          case Alias(r: ResolvedFieldReference, name) => true
    +          case _ => false
    +        }
    +      }
    +    }
    +    child.toRelNode(relBuilder)
    +    if (allAlias) {
    +      relBuilder.push(
    +        LogicalProject.create(relBuilder.peek(),
    +          projectList.map(_.toRexNode(relBuilder)).asJava,
    +          projectList.map(_.name).asJava))
    +    } else {
    +      relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
    +    }
    +  }
    +}
    +
    +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode
{
    +  override def output: Seq[Attribute] =
    +    throw new UnresolvedException("Invalid call to output on AliasNode")
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
    +    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
    +
    +  override lazy val resolved: Boolean = false
    +}
    +
    +case class Distinct(child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.distinct()
    +  }
    +}
    +
    +case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
    +  override def output: Seq[Attribute] = child.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.filter(condition.toRexNode(relBuilder))
    +  }
    +}
    +
    +case class Aggregate(
    +    groupingExpressions: Seq[Expression],
    +    aggregateExpressions: Seq[NamedExpression],
    +    child: LogicalNode) extends UnaryNode {
    +
    +  override def output: Seq[Attribute] = {
    +    (groupingExpressions ++ aggregateExpressions) map { agg =>
    +      agg match {
    +        case ne: NamedExpression => ne.toAttribute
    +        case e => Alias(e, e.toString).toAttribute
    +      }
    +    }
    +  }
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    child.toRelNode(relBuilder)
    +    relBuilder.aggregate(
    +      relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
    +      aggregateExpressions.filter(_.isInstanceOf[Alias]).map { e =>
    +        e match {
    +          case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
    +          case _ => null // this should never happen
    +        }
    +      }.asJava)
    +  }
    +}
    +
    +case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode {
    +  override def output: Seq[Attribute] = left.output
    +
    +  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
    +    left.toRelNode(relBuilder)
    +    right.toRelNode(relBuilder)
    +    relBuilder.union(true)
    +  }
    +}
    --- End diff --
    
    Need to override the `resolved` field to check equality of `left` and `right` output schema.
Will do in next commit.


> Add a validation phase before construct RelNode using TableAPI
> --------------------------------------------------------------
>
>                 Key: FLINK-3754
>                 URL: https://issues.apache.org/jira/browse/FLINK-3754
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API
>    Affects Versions: 1.0.0
>            Reporter: Yijie Shen
>            Assignee: Yijie Shen
>
> Unlike sql string's execution, which have a separate validation phase before RelNode
construction, Table API lacks the counterparts and the validation is scattered in many places.
> I suggest to add a single validation phase and detect problems as early as possible.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message