flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6149) add additional flink logical relation nodes
Date Tue, 18 Apr 2017 02:25:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15972015#comment-15972015
] 

ASF GitHub Bot commented on FLINK-6149:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3594#discussion_r111859756
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/rel/logical/FlinkLogicalSort.scala
---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.rel.logical
    +
    +import org.apache.calcite.plan._
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter}
    +import org.apache.calcite.rex.{RexLiteral, RexNode}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.table.rel.FlinkConventions
    +
    +import scala.collection.JavaConverters._
    +
    +class FlinkLogicalSort(
    +    cluster: RelOptCluster,
    +    traits: RelTraitSet,
    +    child: RelNode,
    +    collation: RelCollation,
    +    offset: RexNode,
    +    fetch: RexNode)
    +  extends Sort(cluster, traits, child, collation, offset, fetch)
    +  with FlinkLogicalRel {
    +
    +  private val limitStart: Long = if (offset != null) {
    +    RexLiteral.intValue(offset)
    +  } else {
    +    0L
    +  }
    +
    +  private val limitEnd: Long = if (fetch != null) {
    +    RexLiteral.intValue(fetch) + limitStart
    +  } else {
    +    Long.MaxValue
    +  }
    +
    +  val getOffset: RexNode = offset
    +
    +  val getFetch: RexNode = fetch
    +
    +  override def copy(
    +      traitSet: RelTraitSet,
    +      newInput: RelNode,
    +      newCollation: RelCollation,
    +      offset: RexNode,
    +      fetch: RexNode): Sort = {
    +
    +    new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch)
    +  }
    +
    +  override def estimateRowCount(metadata: RelMetadataQuery): Double = {
    +    val inputRowCnt = metadata.getRowCount(this.getInput)
    +    if (inputRowCnt == null) {
    +      inputRowCnt
    +    } else {
    +      val rowCount = (inputRowCnt - limitStart).max(1.0)
    +      if (fetch != null) {
    +        val limit = RexLiteral.intValue(fetch)
    +        rowCount.min(limit)
    +      } else {
    +        rowCount
    +      }
    +    }
    +  }
    +
    +  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost
= {
    +    // by default, assume cost is proportional to number of rows
    +    val rowCount: Double = mq.getRowCount(this)
    +    planner.getCostFactory.makeCost(rowCount, rowCount, 0)
    +  }
    +
    +  override def explainTerms(pw: RelWriter) : RelWriter = {
    --- End diff --
    
    yes, these can be removed


> add additional flink logical relation nodes
> -------------------------------------------
>
>                 Key: FLINK-6149
>                 URL: https://issues.apache.org/jira/browse/FLINK-6149
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Kurt Young
>            Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message