flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhueske <...@git.apache.org>
Subject [GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Date Thu, 10 Aug 2017 14:33:38 GMT
Github user fhueske commented on a diff in the pull request:

    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
    @@ -0,0 +1,93 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.rules.datastream
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.flink.table.api.TableConfig
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.plan.nodes.FlinkConventions
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin
    +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin
    +import org.apache.flink.table.plan.schema.RowSchema
    +import org.apache.flink.table.runtime.join.WindowJoinUtil
    +import scala.collection.JavaConverters._
    +class DataStreamJoinRule
    +  extends ConverterRule(
    +    classOf[FlinkLogicalJoin],
    +    FlinkConventions.LOGICAL,
    +    FlinkConventions.DATASTREAM,
    +    "DataStreamJoinRule") {
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin]
    +    val joinInfo = join.analyzeCondition
    +    val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
    +      joinInfo.getRemaining(join.getCluster.getRexBuilder),
    +      join.getLeft.getRowType.getFieldCount,
    +      join.getRowType,
    +      join.getCluster.getRexBuilder,
    +      TableConfig.DEFAULT)
    +    // remaining predicate must not access time attributes
    +    val remainingPredsAccessTime = remainingPreds.isDefined &&
    +      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType)
    +    // Check that no event-time attributes are in the input.
    +    val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
    +      .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +    if (!windowBounds.isDefined && !remainingPredsAccessTime && !rowTimeAttrInOutput)
    --- End diff --
    I'm not sure if this is the right condition for this join implementation.
    I think we need to check that both inputs have a finite size, i.e., there is a bounded
number of records in the table at a given point in time. This is important because we need
to keep all active records of both inputs in state.
    A table which is derived from a stream can have a finite size if:
    - it is the result of a non-windowed aggregation (`GROUP BY user`)
    - it is derived by an update stream->table conversion
    - it is the tail of a stream (e.g., `WHERE rowtime > now() - INTERVAL '4' HOUR` or
    - there might be more cases.
    The windowed join is different in that is can process unbounded tables because it can
limit the valid data ranges with the window boundaries. So, just because a join cannot be
computed with the windowed join does not automatically mean we can process it with this implementation.
    Right now, only non-windowed aggregation is supported and hence the only way to obtain
a finite table from a stream. 
    In `DataSetSingleRowJoinRule.isSingleRow()` we check if an input rel produces only a single
row by tracking back the input. I think we need similar logic here to ensure that the input
relation is of finite size (in our case has a non-windowed `Aggregate`).

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message