flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6090) Add RetractionRule at the stage of decoration
Date Wed, 12 Apr 2017 20:11:43 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966531#comment-15966531
] 

ASF GitHub Bot commented on FLINK-6090:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3696#discussion_r110401510
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRetractionRule.scala
---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.plan.RelOptRule._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.RelNode
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ListBuffer
    +
    +/**
    +  * Collection of retraction rules that apply various transformations on DataStreamRel
trees.
    +  * Currently, there are three transformations: InitProcessRule, NeedToRetractProcessRule
and
    +  * AccModeProcessRule. Note: these rules must be called in order (InitProcessRule ->
    +  * NeedToRetractProcessRule -> AccModeProcessRule).
    +  */
    +object DataStreamRetractionRule {
    +
    +  /**
    +    * Singleton rule that init retraction trait inside a [[DataStreamRel]]
    +    */
    +  val INIT_INSTANCE = new InitProcessRule()
    +
    +  /**
    +    * Singleton rule that decide needToRetract property inside a [[DataStreamRel]]
    +    */
    +  val NEEDTORETRACT_INSTANCE = new NeedToRetractProcessRule()
    +
    +  /**
    +    * Singleton rule that decide accMode inside a [[DataStreamRel]]
    +    */
    +  val ACCMODE_INSTANCE = new AccModeProcessRule()
    +
    +  /**
    +    * Get all child RelNodes of a RelNode
    +    * @param topRel The input RelNode
    +    * @return All child nodes
    +    */
    +  def getChildRelNodes(topRel: RelNode): ListBuffer[RelNode] = {
    +    val topRelInputs = new ListBuffer[RelNode]()
    +    topRelInputs.++=(topRel.getInputs.asScala)
    +    topRelInputs.transform(e => e.asInstanceOf[HepRelVertex].getCurrentRel)
    +  }
    +
    +  def traitSetContainNeedToRetract(traitSet: RelTraitSet): Boolean = {
    +    val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +    if (null == retractionTrait) {
    +      false
    +    } else {
    +      retractionTrait.getNeedToRetract
    +    }
    +  }
    +
    +
    +  /**
    +    * Find all needToRetract nodes. A node needs to retract means that there are downstream
    +    * nodes need retraction from it. Currently, [[DataStreamOverAggregate]] and
    +    * [[DataStreamGroupWindowAggregate]] need retraction from upstream nodes, besides,
a
    +    * needToRetract node also need retraction from it's upstream nodes.
    +    */
    +  class NeedToRetractProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "NeedToRetractProcessRule") {
    +
    +    /**
    +      * Return true if bottom RelNode does not contain needToRetract and top RelNode
need
    +      * retraction from bottom RelNode. Currently, operators which contain aggregations
need
    +      * retraction from upstream nodes, besides, a needToRetract node also needs retraction
from
    +      * it's upstream nodes.
    +      */
    +    def bottomNeedToRetract(topRel: RelNode, bottomRel: RelNode): Boolean = {
    +      val bottomTraits = bottomRel.getTraitSet
    +      if(!traitSetContainNeedToRetract(bottomTraits)){
    +        topRel match {
    +          case _: DataStreamGroupAggregate => true
    +          case _: DataStreamGroupWindowAggregate => true
    +          case _: DataStreamOverAggregate => true
    +          case _ if traitSetContainNeedToRetract(topRel.getTraitSet) => true
    +          case _ => false
    +        }
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Add needToRetract for the input RelNode
    +      */
    +    def addNeedToRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(true, AccMode.Acc)
    +      } else {
    +        retractionTrait = new RetractionTrait(true, retractionTrait.getAccMode)
    +      }
    +
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Returns a new topRel and a needTransform flag for a given topRel and bottomRels.
The new
    +      * topRel contains new bottomRels with needToRetract properly marked. The needTransform
flag
    +      * will be true if any transformation has been done.
    +      *
    +      * @param topRel The input top RelNode.
    +      * @param bottomRels The input bottom RelNodes.
    +      * @return A tuple holding a new top RelNode and a needTransform flag
    +      */
    +    def needToRetractProcess(
    +        topRel: RelNode,
    +        bottomRels: ListBuffer[RelNode])
    +    : (RelNode, Boolean) = {
    +
    +      var needTransform = false
    +      var i = 0
    +      while(i < bottomRels.size) {
    +        val bottomRel = bottomRels(i)
    +        if(bottomNeedToRetract(topRel, bottomRel)) {
    +          needTransform = true
    +          bottomRels(i) = addNeedToRetract(bottomRel)
    +        }
    +        i += 1
    +      }
    +      val newTopRel = topRel.copy(topRel.getTraitSet, bottomRels.asJava)
    +      (newTopRel, needTransform)
    +    }
    +
    +    override def onMatch(call: RelOptRuleCall): Unit = {
    +      val topRel = call.rel(0).asInstanceOf[DataStreamRel]
    +
    +      // get bottom relnodes
    +      val bottomRels = getChildRelNodes(topRel)
    +
    +      // process bottom relnodes
    +      val (newTopRel, needTransform) = needToRetractProcess(topRel, bottomRels)
    +
    +      if (needTransform) {
    +        call.transformTo(newTopRel)
    +      }
    +    }
    +
    +  }
    +
    +
    +  /**
    +    * Find all AccRetract nodes. A node in AccRetract Mode means that the operator may
generate
    +    * or forward an additional retraction message.
    +    *
    +    */
    +  class AccModeProcessRule extends RelOptRule(
    +    operand(
    +      classOf[DataStreamRel], none()),
    +    "AccModeProcessRule") {
    +
    +
    +    /**
    +      * Return true if result table is a replace table
    +      */
    +    def resultTableIsReplaceTable(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +
    +    def traitSetContainAccRetract(traitSet: RelTraitSet): Boolean = {
    +      val retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        false
    +      } else {
    +        retractionTrait.getAccMode == AccMode.AccRetract
    +      }
    +    }
    +
    +    /**
    +      * Return true if the result table of input RelNode is a replace table and the input
RelNode
    +      * is under AccMode with needToRetract.
    +      */
    +    def needSetAccRetract(relNode: RelNode): Boolean = {
    +      val traitSet = relNode.getTraitSet
    +      if (traitSetContainNeedToRetract(traitSet)
    +        && resultTableIsReplaceTable(relNode)
    +        && !traitSetContainAccRetract(traitSet)) {
    +        true
    +      } else {
    +        false
    +      }
    +    }
    +
    +    /**
    +      * Set AccMode to AccRetract for the input RelNode
    +      */
    +    def setAccRetract(relNode: RelNode): RelNode = {
    +      val traitSet = relNode.getTraitSet
    +      var retractionTrait = traitSet.getTrait(RetractionTraitDef.INSTANCE)
    +      if (null == retractionTrait) {
    +        retractionTrait = new RetractionTrait(false, AccMode.AccRetract)
    +      } else {
    +        retractionTrait = new RetractionTrait(retractionTrait.getNeedToRetract,
    +                                              AccMode.AccRetract)
    +      }
    +      relNode.copy(traitSet.plus(retractionTrait), relNode.getInputs)
    +    }
    +
    +    /**
    +      * Currently, window (including group window and over window) does not contain early
firing,
    +      * so [[DataStreamGroupWindowAggregate]] and [[DataStreamOverAggregate]] will digest
    +      * retraction messages. [[DataStreamGroupAggregate]] can also digest retraction
because
    +      * it digests retraction first then generate new retraction messages by itself.
    +      */
    +    def canDigestRetraction(relNode: RelNode): Boolean = {
    +      relNode match {
    +        case _: DataStreamGroupAggregate => true
    +        case _: DataStreamGroupWindowAggregate => true
    +        case _: DataStreamOverAggregate => true
    +        case _ => false
    +      }
    +    }
    +
    +    /**
    +      * Return true if topRel needs to work under AccRetract
    +      */
    +    def topRelNeedSetAccRetract(topRel: RelNode, bottomRels: ListBuffer[RelNode]): Boolean
= {
    +      var needSet: Boolean = false
    +      var bottomRelsUnderAccRetract: Boolean = false
    +      val traitSet = topRel.getTraitSet
    +
    +      // check if top RelNode needs to work under AccRetract by itself
    +      needSet = needSetAccRetract(topRel)
    --- End diff --
    
    ```
    val topNeedsAccRetract = needSetAccRetract(topRel)
    ```


> Add RetractionRule at the stage of decoration
> ---------------------------------------------
>
>                 Key: FLINK-6090
>                 URL: https://issues.apache.org/jira/browse/FLINK-6090
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> Implement optimizer for retraction:
>   1.Add RetractionRule at the stage of decoration´╝îwhich can derive the replace table/append
table, NeedRetraction property.
>   2.Match the NeedRetraction and replace table, mark the accumulating mode
>  
> When this task is finished, we can turn on retraction for different operators according
to accumulating mode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message