Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4473A20049B for ; Mon, 14 Aug 2017 10:53:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42B00164AAE; Mon, 14 Aug 2017 08:53:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 62D8D164AA9 for ; Mon, 14 Aug 2017 10:53:05 +0200 (CEST) Received: (qmail 64946 invoked by uid 500); 14 Aug 2017 08:53:04 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 64937 invoked by uid 99); 14 Aug 2017 08:53:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Aug 2017 08:53:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E9E4818036E for ; Mon, 14 Aug 2017 08:53:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 8Huq8SwAPeE8 for ; Mon, 14 Aug 2017 08:53:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id CC9D75FB69 for ; Mon, 14 Aug 2017 08:53:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 0B940E0E1C for ; Mon, 14 Aug 2017 08:53:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 6B755218F6 for ; Mon, 14 Aug 2017 08:53:00 +0000 (UTC) Date: Mon, 14 Aug 2017 08:53:00 +0000 (UTC) From: "Xingcan Cui (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 14 Aug 2017 08:53:06 -0000 [ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125182#comment-16125182 ] Xingcan Cui edited comment on FLINK-6233 at 8/14/17 8:52 AM: ------------------------------------------------------------- Hi [~fhueske], thanks for the previous work by you and yuhong, the implementation process is going well. Nevertheless, I've got some minor questions/ideas. # Considering that the core logics of the rowtime inner join and proctime inner join are almost the same. Can I extract an abstract {{TimeWindowInnerJoin}} class and let the {{ProcTimeWindowInnerJoin}} and {{RowTimeWindowInnerJoin}} extend it? # The clean up process for the cached data is triggered by ProcessingTimeTimers in the {{ProcTimeWindowInnerJoin}}. For {{RowTimeWindowInnerJoin}}, I think this process could be directly triggered by the watermarks without registering the EventTimeTimer, right? # Since the collections provided by the state backend are simple, it may be inefficient to search for the out-of-dated records. I think the current "short-circuit" codes (as shown below) can not clean all the expired data. {code:java} while (keyIter.hasNext && !validTimestamp) { val recordTime = keyIter.next if (recordTime < expiredTime) { removeList.add(recordTime) } else { // we found a timestamp that is still valid validTimestamp = true } } {code} To cope with that, I plan to split the "cache window" into continuous static-panes, and casting one to expired as a whole. By doing like that, we may store some extra records, whose time interval is equal to the static span of the panes, but can remove the expired data efficiently. # I'd like to introduce an extra {{allowLateness}} parameter (which can be set in the {{StreamQueryConfig}}) to the join function. But for now, I'll give it a default {{0L}} value. There's an extra problem. As I mentioned before, the two streams in the {{CoProcessOperator}} share the same {{InternalTimeServiceManager}}, which means their watermarks are forcibly synchronized to the lower ones. I know why it is designed so, but still think we should provide separate time services for the two input streams. After all, we can not imagine the rowtimes of the two streams are naturally synchronized. However, I can provide an implementation based on the current mechanism and do further optimizations in the future. What do you think? was (Author: xccui): Hi [~fhueske], thanks for the previous work by you and yuhong, the implementation process is going well. Nevertheless, I've got some minor questions/ideas. # Considering that the core logics of the rowtime inner join and proctime inner join are almost the same. Can I extract an abstract {{TimeWindowInnerJoin}} class and let the {{ProcTimeWindowInnerJoin}} and {{RowTimeWindowInnerJoin}} extend it? # The clean up process for the cached data is triggered by ProcessingTimeTimers in the {{ProcTimeWindowInnerJoin}}. For {{RowTimeWindowInnerJoin}}, I think this process could be directly triggered by the watermarks without registering the EventTimeTimer, right? # Since the collections provided by the state backend are simple, it may be inefficient to search for the out-of-dated records. I think the current "short-circuit" codes (as shown below) can not clean all the expired data. {code:java} while (keyIter.hasNext && !validTimestamp) { val recordTime = keyIter.next if (recordTime < expiredTime) { removeList.add(recordTime) } else { // we found a timestamp that is still valid validTimestamp = true } } {code} To cope with that, I plan to split the "cache window" into continuous static-panes, and casting one to expired as a whole. By doing like that, we may store some extra records, whose time interval is equal to the static span of the panes, but can remove the expired data efficiently. # I'd like to introduce an extra {{allowLateness}} parameter (which can be set in the {{StreamQueryConfig}}) to the join function. But for now, I'll give it a default {{0L}} value. > Support rowtime inner equi-join between two streams in the SQL API > ------------------------------------------------------------------ > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: hongyuhong > Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR}} only can use rowtime that is a system attribute, the time condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support unbounded like {{o.rowtime < s.rowtime}} , and should include both two stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this would mean in insert a row into a sorted order shift all other computations. This would be too expensive to maintain. Therefore, we will throw an error if a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)