flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Date Fri, 21 Apr 2017 10:36:22 GMT
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112654288
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
---
    @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends
Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.getSqlAggFunction()
    +
    +    val relDataType = relBuilder
    +      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +      .createTypeFromTypeInfo(agg.resultType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime")) {
    +        // for stream event-time
    +        relBuilder.call(EventTimeExtractor)
    +      }
    +      else if (orderName.equalsIgnoreCase("proctime")) {
    +        // for stream proc-time
    +        relBuilder.call(ProcTimeExtractor)
    +      } else {
    +        // for batch event-time
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode, sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach {
    +      x =>
    +        val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
    +        if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
    +          throw ValidationException(
    +            s"expression $partitionKey cannot be used as a partition key expression "
+
    +            "because it's not a valid key type which must be hashable and comparable")
    +        }
    +        partitionKeys.add(partitionKey)
    +    }
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    bound: Literal,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) {
    +      val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
    +      create(unbounded, null)
    +    } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) {
    +      val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
    +      create(currentRow, null)
    +    } else {
    +      val returnType = relBuilder
    +        .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +        .createTypeFromTypeInfo(Types.DECIMAL)
    +
    +      val sqlOperator = new SqlPostfixOperator(
    +        sqlKind.name,
    +        sqlKind,
    +        2,
    +        new OrdinalReturnTypeInference(0),
    +        null,
    +        null)
    +
    +      val operands: Array[SqlNode] = new Array[SqlNode](1)
    +      operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
    +
    +      val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
    +
    +      val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +      expressions.add(relBuilder.literal(bound.value))
    +
    +      val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
    +
    +      create(node, rexNode)
    +    }
    +  }
    +
    +  override private[flink] def children: Seq[Expression] = Seq(agg)
    +
    +  override def toString = s"${this.getClass.getCanonicalName}(${overWindowAlias.toString})"
    +
    +  override private[flink] def resultType = agg.resultType
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    var validationResult: ValidationResult = ValidationSuccess
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +    if (!orderName.equalsIgnoreCase("rowtime")
    --- End diff --
    
    If we replace the `UnresolvedFieldReference` by a `RowTIme` or `ProcTime` expression,
we should check here if it is a `TimeIndicator` or a valid field reference. This is the same
check that we need to do for the `partitionBy` fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message