flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...
Date Thu, 04 Jan 2018 23:26:15 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159764095
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
    @@ -791,92 +824,109 @@ abstract class TableEnvironment(val config: TableConfig) {
         * Returns field names and field positions for a given [[TypeInformation]] and [[Array]]
of
         * [[Expression]]. It does not handle time attributes but considers them in indices.
         *
    +    * @param isReferenceByPosition schema mode see [[isReferenceByPosition()]]
         * @param inputType The [[TypeInformation]] against which the [[Expression]]s are
evaluated.
         * @param exprs     The expressions that define the field names.
         * @tparam A The type of the TypeInformation.
         * @return A tuple of two arrays holding the field names and corresponding field positions.
         */
    -  protected[flink] def getFieldInfo[A](
    +  protected def getFieldInfo[A](
    +      isReferenceByPosition: Boolean,
           inputType: TypeInformation[A],
           exprs: Array[Expression])
         : (Array[String], Array[Int]) = {
     
         TableEnvironment.validateType(inputType)
     
    +    def referenceByName(name: String, ct: CompositeType[_]): Option[(Int, String)] =
{
    +      val inputIdx = ct.getFieldIndex(name)
    +      if (inputIdx < 0) {
    +        throw new TableException(s"$name is not a field of type $ct. " +
    +                s"Expected: ${ct.getFieldNames.mkString(", ")}")
    +      } else {
    +        Some((inputIdx, name))
    +      }
    +    }
    +
         val indexedNames: Array[(Int, String)] = inputType match {
    +
           case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
             throw new TableException(
               "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
                 "Please specify the type of the input with a RowTypeInfo.")
    -      case a: AtomicType[_] =>
    -        exprs.zipWithIndex flatMap {
    -          case (_: TimeAttribute, _) =>
    -            None
    -          case (UnresolvedFieldReference(name), idx) if idx > 0 =>
    -            // only accept the first field for an atomic type
    -            throw new TableException("Only the first field can reference an atomic type.")
    -          case (UnresolvedFieldReference(name), idx) =>
    -            // first field reference is mapped to atomic type
    -            Some((0, name))
    -          case _ => throw new TableException("Field reference expression requested.")
    -        }
    +
           case t: TupleTypeInfo[A] =>
             exprs.zipWithIndex flatMap {
    -          case (UnresolvedFieldReference(name), idx) =>
    -            Some((idx, name))
    -          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
    +          case (UnresolvedFieldReference(name: String), idx) =>
    +            if (isReferenceByPosition) {
    +              Some((idx, name))
    +            } else {
    +              referenceByName(name, t)
    +            }
    +          case (Alias(UnresolvedFieldReference(origName), name: String, _), _) =>
                 val idx = t.getFieldIndex(origName)
    --- End diff --
    
    we can use `referenceByName()` here.
      


---

Mime
View raw message