Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1BB03200CCA for ; Tue, 4 Jul 2017 16:30:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1A6DD1614F1; Tue, 4 Jul 2017 14:30:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1B8611614F2 for ; Tue, 4 Jul 2017 16:30:11 +0200 (CEST) Received: (qmail 37666 invoked by uid 500); 4 Jul 2017 14:30:11 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 37579 invoked by uid 99); 4 Jul 2017 14:30:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Jul 2017 14:30:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 871C0190D6B for ; Tue, 4 Jul 2017 14:30:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id sgTvMymSaI7h for ; Tue, 4 Jul 2017 14:30:08 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 8B5755FDA3 for ; Tue, 4 Jul 2017 14:30:07 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id DBE00E0E61 for ; Tue, 4 Jul 2017 14:30:04 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 848D52466E for ; Tue, 4 Jul 2017 14:30:01 +0000 (UTC) Date: Tue, 4 Jul 2017 14:30:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6232) Support proctime inner equi-join between two streams in the SQL API MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 04 Jul 2017 14:30:13 -0000 [ https://issues.apache.org/jira/browse/FLINK-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073724#comment-16073724 ] ASF GitHub Bot commented on FLINK-6232: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3715#discussion_r125443593 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeWindowInnerJoin.scala --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join + +import java.util +import java.util.{List => JList} + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +/** + * A CoProcessFunction to support stream join stream, currently just support inner-join + * + * @param leftLowerBound + * the left stream lower bound, and -leftLowerBound is the right stream upper bound + * @param leftUpperBound + * the left stream upper bound, and -leftUpperBound is the right stream lower bound + * @param element1Type the input type of left stream + * @param element2Type the input type of right stream + * @param genJoinFuncName the function code of other non-equi condition + * @param genJoinFuncCode the function name of other non-equi condition + * + */ +class ProcTimeWindowInnerJoin( + private val leftLowerBound: Long, + private val leftUpperBound: Long, + private val element1Type: TypeInformation[Row], + private val element2Type: TypeInformation[Row], + private val genJoinFuncName: String, + private val genJoinFuncCode: String) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]]{ + + private var cRowWrapper: CRowWrappingCollector = _ + + /** other condition function **/ + private var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + /** tmp list to store expired records **/ + private var listToRemove: JList[Long] = _ + + /** state to hold left stream element **/ + private var row1MapState: MapState[Long, JList[Row]] = _ + /** state to hold right stream element **/ + private var row2MapState: MapState[Long, JList[Row]] = _ + + /** state to record last timer of left stream, 0 means no timer **/ + private var timerState1: ValueState[Long] = _ + /** state to record last timer of right stream, 0 means no timer **/ + private var timerState2: ValueState[Long] = _ + + private val leftStreamWinSize: Long = if (leftLowerBound < 0) -leftLowerBound else 0 + private val rightStreamWinSize: Long = if (leftUpperBound > 0) leftUpperBound else 0 + + val LOG = LoggerFactory.getLogger(this.getClass) + + override def open(config: Configuration) { + LOG.debug(s"Compiling JoinFunction: $genJoinFuncName \n\n " + + s"Code:\n$genJoinFuncCode") + val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) + LOG.debug("Instantiating JoinFunction.") + joinFunction = clazz.newInstance() + + listToRemove = new util.ArrayList[Long]() + cRowWrapper = new CRowWrappingCollector() + + // initialize row state + val rowListTypeInfo1: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element1Type) + val mapStateDescriptor1: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row1mapstate", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo1) + row1MapState = getRuntimeContext.getMapState(mapStateDescriptor1) + + val rowListTypeInfo2: TypeInformation[JList[Row]] = new ListTypeInfo[Row](element2Type) + val mapStateDescriptor2: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("row2mapstate", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo2) + row2MapState = getRuntimeContext.getMapState(mapStateDescriptor2) + + // initialize timer state + val valueStateDescriptor1: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate1", classOf[Long]) + timerState1 = getRuntimeContext.getState(valueStateDescriptor1) + + val valueStateDescriptor2: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("timervaluestate2", classOf[Long]) + timerState2 = getRuntimeContext.getState(valueStateDescriptor2) + } + + /** + * Process left stream records + * + * @param valueC The input value. + * @param ctx The ctx to register timer or get current time + * @param out The collector for returning result values. + * + */ + override def processElement1( + valueC: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + processElement( + valueC, + ctx, + out, + leftStreamWinSize, + timerState1, + row1MapState, + row2MapState, + -leftUpperBound, // right stream lower + -leftLowerBound, // right stream upper + true + ) + } + + /** + * Process right stream records + * + * @param valueC The input value. + * @param ctx The ctx to register timer or get current time + * @param out The collector for returning result values. + * + */ + override def processElement2( + valueC: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + processElement( + valueC, + ctx, + out, + rightStreamWinSize, + timerState2, + row2MapState, + row1MapState, + leftLowerBound, // left stream upper + leftUpperBound, // left stream upper + false + ) + } + + /** + * Called when a processing timer trigger. + * Expire left/right records which earlier than current time - windowsize. + * + * @param timestamp The timestamp of the firing timer. + * @param ctx The ctx to register timer or get current time + * @param out The collector for returning result values. + */ + override def onTimer( + timestamp: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { + + if (timerState1.value == timestamp) { + expireOutTimeRow( + timestamp, + leftStreamWinSize, + row1MapState, + timerState1, + ctx + ) + } + + if (timerState2.value == timestamp) { + expireOutTimeRow( + timestamp, + rightStreamWinSize, + row2MapState, + timerState2, + ctx + ) + } + } + + /** + * Puts an element from the input stream into state and search the other state to + * output records meet the condition, and registers a timer for the current record + * if there is no timer at present. + */ + private def processElement( + valueC: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + winSize: Long, + timerState: ValueState[Long], + rowMapState: MapState[Long, JList[Row]], + oppoRowMapState: MapState[Long, JList[Row]], + oppoLowerBound: Long, + oppoUpperBound: Long, + isLeft: Boolean): Unit = { + + cRowWrapper.out = out + cRowWrapper.setChange(valueC.change) --- End diff -- We can set this to `true` when initializing `cRowWrapper`. The join does not support any kinds of updates (or deletes). > Support proctime inner equi-join between two streams in the SQL API > ------------------------------------------------------------------- > > Key: FLINK-6232 > URL: https://issues.apache.org/jira/browse/FLINK-6232 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: hongyuhong > Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.proctime, o.productId, o.orderId, s.proctime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.proctime BETWEEN s.proctime AND s.proctime + INTERVAL '1' HOUR}} only can use proctime that is a system attribute, the time condition only support bounded time range like {{o.proctime BETWEEN s.proctime - INTERVAL '1' HOUR AND s.proctime + INTERVAL '1' HOUR}}, not support unbounded like {{o.proctime > s.protime}}, and should include both two stream's proctime attribute, {{o.proctime between proctime() and proctime() + 1}} should also not be supported. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)