From issues-return-156979-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Mar 6 05:46:48 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 06225180608 for ; Tue, 6 Mar 2018 05:46:47 +0100 (CET) Received: (qmail 91262 invoked by uid 500); 6 Mar 2018 04:46:46 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 91253 invoked by uid 99); 6 Mar 2018 04:46:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Mar 2018 04:46:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D6014F6440; Tue, 6 Mar 2018 04:46:45 +0000 (UTC) From: walterddr To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w... Content-Type: text/plain Message-Id: <20180306044645.D6014F6440@git1-us-west.apache.org> Date: Tue, 6 Mar 2018 04:46:45 +0000 (UTC) Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r172408402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowLeftJoinWithNonEquiPredicates.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Connect data for left stream and right stream. Only use for LeftJoin with NonEquiPredicates. + * An MapState of type [Row, Long] is added to record how many rows from the right table can be + * matched for each left row. Left join without NonEquiPredicates doesn't need it because + * left rows can always join right rows as long as join keys are same. + * + * @param leftType the input type of left stream + * @param rightType the input type of right stream + * @param resultType the output type of join + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * @param queryConfig the configuration for the query to generate + */ +class NonWindowLeftJoinWithNonEquiPredicates( + leftType: TypeInformation[Row], + rightType: TypeInformation[Row], + resultType: TypeInformation[CRow], + genJoinFuncName: String, + genJoinFuncCode: String, + queryConfig: StreamQueryConfig) + extends NonWindowJoin( + leftType, + rightType, + resultType, + genJoinFuncName, + genJoinFuncCode, + queryConfig) { + + // result row, all field from right will be null + private var resultRow: Row = _ + // how many matched rows from the right table for each left row + private var leftJoinCnt: MapState[Row, Long] = _ + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + + val leftJoinCntDescriptor = new MapStateDescriptor[Row, Long]( --- End diff -- I think either is fine as long as they are consistent. ---