flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #5132: [FLINK-8203] [FLINK-7681] [table] Make schema defi...
Date Fri, 05 Jan 2018 14:36:22 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5132#discussion_r159876208
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
    @@ -446,52 +443,60 @@ abstract class StreamTableEnvironment(
         * Checks for at most one rowtime and proctime attribute.
         * Returns the time attributes.
         *
    -    * @param isReferenceByPosition schema mode see [[isReferenceByPosition()]]
    -    *
         * @return rowtime attribute and proctime attribute
         */
       private def validateAndExtractTimeAttributes(
    -    isReferenceByPosition: Boolean,
         streamType: TypeInformation[_],
         exprs: Array[Expression])
       : (Option[(Int, String)], Option[(Int, String)]) = {
     
    -    val fieldTypes: Array[TypeInformation[_]] = streamType match {
    -      case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray
    -      case t: TypeInformation[_] => Array(t)
    +    val (isRefByPos, fieldTypes) = streamType match {
    +      case c: CompositeType[_] =>
    +        // determine schema definition mode (by position or by name)
    +        (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => c.getTypeAt(i)).toArray)
    +      case t: TypeInformation[_] =>
    +        (false, Array(t))
         }
     
         var fieldNames: List[String] = Nil
         var rowtime: Option[(Int, String)] = None
         var proctime: Option[(Int, String)] = None
     
    +    def checkRowtimeType(t: TypeInformation[_]): Unit = {
    +      if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
    +        throw new TableException(
    +          s"The rowtime attribute can only replace a field with a valid time type, "
+
    +          s"such as Timestamp or Long. But was: $t")
    +      }
    +    }
    +
         def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
           if (rowtime.isDefined) {
             throw new TableException(
               "The rowtime attribute can only be defined once in a table schema.")
           } else {
             // if the fields are referenced by position,
             // it is possible to replace an existing field or append the time attribute at
the end
    -        if (isReferenceByPosition) {
    -
    -          val mappedIdx = streamType match {
    -            case pti: PojoTypeInfo[_] =>
    -              pti.getFieldIndex(origName.getOrElse(name))
    -            case _ => idx;
    -          }
    -
    +        if (isRefByPos) {
               // check type of field that is replaced
    -          if (mappedIdx < 0) {
    +          if (idx < 0) {
                 throw new TableException(
                   s"The rowtime attribute can only replace a valid field. " +
                     s"${origName.getOrElse(name)} is not a field of type $streamType.")
               }
    -          else if (mappedIdx < fieldTypes.length &&
    -            !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
    -              TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
    -            throw new TableException(
    -              s"The rowtime attribute can only replace a field with a valid time type,
" +
    -                s"such as Timestamp or Long. But was: ${fieldTypes(mappedIdx)}")
    +          else if (idx < fieldTypes.length) {
    +            checkRowtimeType(fieldTypes(idx))
    +          }
    +        }
    +        // check for valid alias if referenced by name
    +        else if (origName.isDefined) {
    +          // check for valid alias
    +          streamType match {
    +            case ct: CompositeType[_] if ct.hasField(origName.get) =>
    +              val t = ct.getTypeAt(ct.getFieldIndex(origName.get))
    +              checkRowtimeType(t)
    +            case _ =>
    +              throw new TableException("An alias must always reference an existing field.")
    --- End diff --
    
    Add `origName` (and the existing fields?) to the error message.


---

Mime
View raw message