flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6196) Support dynamic schema in Table Function
Date Wed, 05 Apr 2017 16:50:42 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957220#comment-15957220
] 

ASF GitHub Bot commented on FLINK-6196:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3623#discussion_r109964147
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
---
    @@ -109,6 +110,133 @@ class TableFunc3(data: String, conf: Map[String, String]) extends
TableFunction[
       }
     }
     
    +class DynamicSchema extends TableFunction[Row] {
    +
    +  def eval(str: String, column: Int): Unit = {
    +    if (str.contains("#")) {
    +      str.split("#").foreach({ s =>
    +        val row = new Row(column)
    +        row.setField(0, s)
    +        var i = 0
    +        for (i <- 1 until column) {
    +          row.setField(i, s.length)
    +        }
    +        collect(row)
    +      })
    +    }
    +  }
    +
    +  override def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[Row]
= {
    +    val column = arguments.get(1).asInstanceOf[Int]
    +    val basicTypeInfos = new Array[TypeInformation[_]](column)
    +    basicTypeInfos(0) = BasicTypeInfo.STRING_TYPE_INFO
    +    for (i <- 1 until column) {
    +      basicTypeInfos(i) = BasicTypeInfo.INT_TYPE_INFO
    +    }
    +    new RowTypeInfo(basicTypeInfos: _*)
    +  }
    +}
    +
    +class DynamicSchema0 extends TableFunction[Row] {
    +
    +  def eval(str: String, cols: String): Unit = {
    +    val columns = cols.split(",")
    +
    +    if (str.contains("#")) {
    +      str.split("#").foreach({ s =>
    +        val row = new Row(columns.length)
    +        row.setField(0, s)
    +        for (i <- 1 until columns.length) {
    +          if (columns(i).equals("string")) {
    +            row.setField(i, s.length.toString)
    +          } else if (columns(i).equals("int")) {
    +            row.setField(i, s.length)
    +          }
    +        }
    +        collect(row)
    +      })
    +    }
    +  }
    +
    +  override def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[Row]
= {
    +    val columnStr = arguments.get(1).asInstanceOf[String]
    +    val columns = columnStr.split(",")
    +
    +    val basicTypeInfos = new Array[TypeInformation[_]](columns.length)
    --- End diff --
    
    can be simplified to 
    ```
    val basicTypeInfos = for (c <- columns) yield c match {
            case "string" => BasicTypeInfo.STRING_TYPE_INFO
            case "int" => BasicTypeInfo.INT_TYPE_INFO
          }
    ```


> Support dynamic schema in Table Function
> ----------------------------------------
>
>                 Key: FLINK-6196
>                 URL: https://issues.apache.org/jira/browse/FLINK-6196
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Zhuoluo Yang
>            Assignee: Zhuoluo Yang
>
> In many of our use cases. We have to decide the schema of a UDTF at the run time. For
example. udtf('c1, c2, c3') will generate three columns for a lateral view. 
> Most systems such as calcite and hive support this feature. However, the current implementation
of flink didn't implement the feature correctly.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message