flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3226) Translate optimized logical Table API plans into physical plans representing DataSet programs
Date Tue, 09 Feb 2016 12:38:18 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138881#comment-15138881
] 

ASF GitHub Bot commented on FLINK-3226:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52301780
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
    +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +object CodeGenUtils {
    +
    +  private val nameCounter = new AtomicInteger
    +
    +  def newName(name: String): String = {
    +    s"$name$$${nameCounter.getAndIncrement}"
    +  }
    +
    +  // when casting we first need to unbox Primitives, for example,
    +  // float a = 1.0f;
    +  // byte b = (byte) a;
    +  // works, but for boxed types we need this:
    +  // Float a = 1.0f;
    +  // Byte b = (byte)(float) a;
    +  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "int"
    +    case LONG_TYPE_INFO => "long"
    +    case SHORT_TYPE_INFO => "short"
    +    case BYTE_TYPE_INFO => "byte"
    +    case FLOAT_TYPE_INFO => "float"
    +    case DOUBLE_TYPE_INFO => "double"
    +    case BOOLEAN_TYPE_INFO => "boolean"
    +    case CHAR_TYPE_INFO => "char"
    +
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "-1"
    +    case LONG_TYPE_INFO => "-1"
    +    case SHORT_TYPE_INFO => "-1"
    +    case BYTE_TYPE_INFO => "-1"
    +    case FLOAT_TYPE_INFO => "-1.0f"
    +    case DOUBLE_TYPE_INFO => "-1.0d"
    +    case BOOLEAN_TYPE_INFO => "false"
    +    case STRING_TYPE_INFO => "\"<empty>\""
    +    case CHAR_TYPE_INFO => "'\\0'"
    +    case _ => "null"
    +  }
    +
    +  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case nti: NumericTypeInfo[_] => // ok
    +    case _ => throw new CodeGenException("Numeric expression type expected.")
    +  }
    +
    +  def requireString(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case STRING_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("String expression type expected.")
    +  }
    +
    +  def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case BOOLEAN_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("Boolean expression type expected.")
    +  }
    +
    +  def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
    +
    +  def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case INT_TYPE_INFO
    +         | LONG_TYPE_INFO
    +         | SHORT_TYPE_INFO
    +         | BYTE_TYPE_INFO
    +         | FLOAT_TYPE_INFO
    +         | DOUBLE_TYPE_INFO
    +         | BOOLEAN_TYPE_INFO
    +         | CHAR_TYPE_INFO => false
    +    case _ => true
    +  }
    +
    +  def isNumeric(genExpr: GeneratedExpression): Boolean = isNumeric(genExpr.resultType)
    +
    +  def isNumeric(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case nti: NumericTypeInfo[_] => true
    +    case _ => false
    +  }
    +
    +  def isString(genExpr: GeneratedExpression): Boolean = isString(genExpr.resultType)
    +
    +  def isString(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case STRING_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  def isBoolean(genExpr: GeneratedExpression): Boolean = isBoolean(genExpr.resultType)
    +
    +  def isBoolean(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case BOOLEAN_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  sealed abstract class FieldAccessor
    +
    +  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
    +
    +  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
    +
    +  case class ProductAccessor(i: Int) extends FieldAccessor
    +
    +  def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
    +    compType match {
    +      case ri: RowTypeInfo =>
    +        ProductAccessor(index)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        ObjectMethodAccessor(cc.getFieldNames()(index))
    +
    +      case javaTup: TupleTypeInfo[_] =>
    +        ObjectFieldAccessor("f" + index)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        ObjectFieldAccessor(pj.getFieldNames()(index))
    --- End diff --
    
    I forgot to mention this in the limitations list. I will implement a case distinction
and call the getters for private fields in my next commit.


> Translate optimized logical Table API plans into physical plans representing DataSet
programs
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-3226
>                 URL: https://issues.apache.org/jira/browse/FLINK-3226
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Fabian Hueske
>            Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see FLINK-3225) query
plan into a physical plan. The physical plan is a 1-to-1 representation of the DataSet program
that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one Flink DataSet
operator (Map, Reduce, Join, ...). The RelNodes must hold all relevant operator information
(keys, user-code expression, strategy hints, parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink RelNodes. We start
with a straight-forward mapping and later add rules that merge several relational operators
into a single Flink operator, e.g., merge a join followed by a filter. Timo implemented some
rules for the first SQL implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message