flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...
Date Thu, 04 Jan 2018 23:26:15 GMT
Github user fhueske commented on a diff in the pull request:

    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
    @@ -768,6 +768,39 @@ abstract class TableEnvironment(val config: TableConfig) {
    +  /**
    +    * Reference input fields by name:
    +    * All fields in the schema definition are referenced by name
    +    * (and possibly renamed using an alias (as). In this mode, fields can be reordered
    +    * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
    +    * positions using arbitrary names (except those that exist in the result schema).
This mode
    +    * can be used for any input type, including POJOs.
    +    *
    +    * Reference input fields by position:
    +    * Field references must refer to existing fields in the input type (except for
    +    * renaming with alias (as)). In this mode, fields are simply renamed. Event-time
attributes can
    +    * replace the field on their position in the input data (if it is of correct type)
or be
    +    * appended at the end. Proctime attributes must be appended at the end. This mode
can only be
    +    * used if the input type has a defined field order (tuple, case class, Row) and no
of fields
    +    * references a field of the input type.
    +    */
    +  protected def isReferenceByPosition(t: TypeInformation[_], fields: Array[Expression]):
Boolean = {
    +    if (t.isInstanceOf[PojoTypeInfo[_]]) {
    +      return false
    +    }
    +    val inputNames = t match {
    +      case ct: CompositeType[_] => ct.getFieldNames
    +      case _ => return false // atomic types are references by name
    +    }
    +    // use the by position mode if no of the fields exists in the input
    +    fields.forall {
    +      case UnresolvedFieldReference(name) => !inputNames.contains(name)
    --- End diff --
    I had suggested to use ref-by-name if all field names of the schema are in the input type
and ref-by-pos otherwise. This implementation is a bit different as it excludes cases where
at least one but not all of the schema fields are in the input type. Hence, there schema mappings
that cannot be resolved and will fail.
    For example if we have a Tuple3 input type, we cannot name fields like `('f0, 'f1, 'myName)`.
This breaks the current behavior. On the other hand, it prevents possibly confusing cases
like `('f2, 'f0, 'myName)` where fields are renamed by position but the user might assume
reordering instead of renaming.
    Not sure what's the best strategy here, but I wanted to raise this point.


View raw message