flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4469) Add support for user defined table function in Table API & SQL
Date Fri, 18 Nov 2016 03:27:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15675610#comment-15675610
] 

ASF GitHub Bot commented on FLINK-4469:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2653#discussion_r88594496
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.functions.utils
    +
    +import com.google.common.base.Predicate
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.sql._
    +import org.apache.calcite.sql.`type`._
    +import org.apache.calcite.sql.parser.SqlParserPos
    +import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
    +import org.apache.calcite.util.Util
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.functions.TableFunction
    +import org.apache.flink.api.table.FlinkTypeFactory
    +
    +import scala.collection.JavaConversions._
    +import java.util
    +
    +import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
    +
    +/**
    +  * Calcite wrapper for user-defined table functions.
    +  */
    +class TableSqlFunction(
    +    name: String,
    +    udtf: TableFunction[_],
    +    rowTypeInfo: TypeInformation[_],
    +    returnTypeInference: SqlReturnTypeInference,
    +    operandTypeInference: SqlOperandTypeInference,
    +    operandTypeChecker: SqlOperandTypeChecker,
    +    paramTypes: util.List[RelDataType],
    +    functionImpl: FlinkTableFunctionImpl[_])
    +  extends SqlUserDefinedTableFunction(
    +    new SqlIdentifier(name, SqlParserPos.ZERO),
    +    returnTypeInference,
    +    operandTypeInference,
    +    operandTypeChecker,
    +    paramTypes,
    +    functionImpl) {
    +
    +  def getTableFunction = udtf
    +
    +  def getRowTypeInfo = rowTypeInfo
    +
    +  def getPojoFieldMapping = functionImpl.fieldIndexes
    +
    +}
    +
    +object TableSqlFunction {
    +  /**
    +    *
    +    * @param name function name (used by SQL parser)
    +    * @param udtf user defined table function to be called
    +    * @param rowTypeInfo the row type information generated by the table function
    +    * @param typeFactory type factory for converting Flink's between Calcite's types
    +    * @param functionImpl calcite table function schema
    +    * @return
    +    */
    +  def apply(
    +    name: String,
    +    udtf: TableFunction[_],
    +    rowTypeInfo: TypeInformation[_],
    +    typeFactory: FlinkTypeFactory,
    +    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
    +
    +    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
    +    val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
    +    for (o <- functionImpl.getParameters) {
    +      val relType: RelDataType = o.getType(typeFactory)
    +      argTypes.add(relType)
    +      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
    --- End diff --
    
    sure


> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row and output
a single output row. In contrast, table-generating functions transform a single input row
to multiple output rows. It is very useful in some cases, such as look up in HBase by rowkey
and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of Java type
erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method to call
according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str != null) {
>             for (String s : str.split(",")) {
>                 collect(new Word(s, s.length()));
>             }
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")	
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message