flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haohui <...@git.apache.org>
Subject [GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...
Date Wed, 05 Apr 2017 08:33:50 GMT
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109857875
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    Agree. 97d1a45 will throw an `TableException` if the configuration is not fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message