flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5280) Extend TableSource to support nested data
Date Fri, 23 Dec 2016 12:23:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772759#comment-15772759
] 

ASF GitHub Bot commented on FLINK-5280:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3039#discussion_r93758297
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
    @@ -535,4 +509,74 @@ object TableEnvironment {
     
         new ScalaStreamTableEnv(executionEnvironment, tableConfig)
       }
    +
    +  /**
    +    * Returns field names and field positions for a given [[TypeInformation]].
    +    *
    +    * Field names are automatically extracted for
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    * The method fails if inputType is not a
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    *
    +    * @param inputType The TypeInformation extract the field names and positions from.
    +    * @tparam A The type of the TypeInformation.
    +    * @return A tuple of two arrays holding the field names and corresponding field positions.
    +    */
    +  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
    +    validateType(inputType)
    +
    +    val fieldNames: Array[String] = inputType match {
    +      case t: TupleTypeInfo[A] => t.getFieldNames
    +      case c: CaseClassTypeInfo[A] => c.getFieldNames
    +      case p: PojoTypeInfo[A] => p.getFieldNames
    +      case r: RowTypeInfo => r.getFieldNames
    +      case tpe =>
    +        throw new TableException(s"Type $tpe lacks explicit field naming")
    +    }
    +    val fieldIndexes = fieldNames.indices.toArray
    +
    +    if (fieldNames.contains("*")) {
    +      throw new TableException("Field name can not be '*'.")
    +    }
    +
    +    (fieldNames, fieldIndexes)
    +  }
    +
    +  def validateType(typeInfo: TypeInformation[_]): Unit = {
    +    val clazz = typeInfo.getTypeClass
    +    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
    +      !Modifier.isPublic(clazz.getModifiers) ||
    +      clazz.getCanonicalName == null) {
    +      throw TableException(s"Class '$clazz' described in type information '$typeInfo'
must be " +
    +        s"static and globally accessible.")
    +    }
    +  }
    +
    +  /**
    +    * Returns field types for a given [[TypeInformation]].
    +    *
    +    * Field types are automatically extracted for
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    * The method fails if inputType is not a
    +    * [[org.apache.flink.api.common.typeutils.CompositeType]].
    +    *
    +    * @param inputType The TypeInformation to extract field types from.
    +    * @return an holding the field types.
    +    */
    +  def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = {
    +    validateType(inputType)
    +
    +    inputType match {
    +      case t: TupleTypeInfo[_] => getTypes(t)
    +      case c: CaseClassTypeInfo[_] => getTypes(c)
    +      case p: PojoTypeInfo[_] => getTypes(p)
    +      case r: RowTypeInfo => getTypes(r)
    +      case tpe =>
    --- End diff --
    
    Same here, should support `AtomicType`.


> Extend TableSource to support nested data
> -----------------------------------------
>
>                 Key: FLINK-5280
>                 URL: https://issues.apache.org/jira/browse/FLINK-5280
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of flat rows.

> However, there are several storage formats for nested data that should be supported such
as Avro, Json, Parquet, and Orc. The Table API and SQL can also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in Calcite's schema
need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message