Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0915200CF7 for ; Tue, 19 Sep 2017 09:08:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BCF1E1609E1; Tue, 19 Sep 2017 07:08:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B53BA1609DB for ; Tue, 19 Sep 2017 09:08:03 +0200 (CEST) Received: (qmail 46062 invoked by uid 500); 19 Sep 2017 07:08:02 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 46051 invoked by uid 99); 19 Sep 2017 07:08:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Sep 2017 07:08:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 666911A32AE for ; Tue, 19 Sep 2017 07:08:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.02 X-Spam-Level: X-Spam-Status: No, score=-4.02 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id elbKt0NaNfmF for ; Tue, 19 Sep 2017 07:08:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 064E35FDE7 for ; Tue, 19 Sep 2017 07:07:58 +0000 (UTC) Received: (qmail 45921 invoked by uid 99); 19 Sep 2017 07:07:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Sep 2017 07:07:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BEAFBF5549; Tue, 19 Sep 2017 07:07:57 +0000 (UTC) From: xccui To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i... Content-Type: text/plain Message-Id: <20170919070757.BEAFBF5549@git1-us-west.apache.org> Date: Tue, 19 Sep 2017 07:07:57 +0000 (UTC) archived-at: Tue, 19 Sep 2017 07:08:04 -0000 Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139611419 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala --- @@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin( // Initialize the data caches. val leftListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](leftType) val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinLeftCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], leftListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinLeftCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + leftListTypeInfo) leftCache = getRuntimeContext.getMapState(leftStateDescriptor) val rightListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](rightType) val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] = - new MapStateDescriptor[Long, JList[Row]](timeIndicator + "InnerJoinRightCache", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rightListTypeInfo) + new MapStateDescriptor[Long, JList[Row]]( + timeIndicator + "InnerJoinRightCache", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], + rightListTypeInfo) rightCache = getRuntimeContext.getMapState(rightStateDescriptor) // Initialize the timer states. val leftTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinLeftTimerState", classOf[Long]) leftTimerState = getRuntimeContext.getState(leftTimerStateDesc) val rightTimerStateDesc: ValueStateDescriptor[Long] = - new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", - classOf[Long]) + new ValueStateDescriptor[Long](timeIndicator + "InnerJoinRightTimerState", classOf[Long]) rightTimerState = getRuntimeContext.getState(rightTimerStateDesc) } /** - * Process records from the left stream. - * - * @param cRowValue the input record - * @param ctx the context to register timer or get current time - * @param out the collector for outputting results - * + * Process rows from the left stream. */ override def processElement1( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForLeftStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - rightRelativeSize + val oppositeUpperBound: Long = rowTime + leftRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, leftOperatorTime, + oppositeLowerBound, + oppositeUpperBound, rightOperatorTime, rightTimerState, leftCache, rightCache, - true + leftRow = true ) } /** - * Process records from the right stream. - * - * @param cRowValue the input record - * @param ctx the context to get current time - * @param out the collector for outputting results - * + * Process rows from the right stream. */ override def processElement2( - cRowValue: CRow, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow]): Unit = { - val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false) - getCurrentOperatorTime(ctx) + cRowValue: CRow, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + updateOperatorTime(ctx) + val rowTime: Long = getTimeForRightStream(ctx, cRowValue) + val oppositeLowerBound: Long = rowTime - leftRelativeSize + val oppositeUpperBound: Long = rowTime + rightRelativeSize processElement( cRowValue, - timeForRecord, + rowTime, ctx, out, rightOperatorTime, + oppositeLowerBound, + oppositeUpperBound, leftOperatorTime, leftTimerState, rightCache, leftCache, - false + leftRow = false ) } /** - * Put a record from the input stream into the cache and iterate the opposite cache to - * output records meeting the join conditions. If there is no timer set for the OPPOSITE + * Put a row from the input stream into the cache and iterate the opposite cache to + * output join results meeting the conditions. If there is no timer set for the OPPOSITE * STREAM, register one. */ private def processElement( - cRowValue: CRow, - timeForRecord: Long, - ctx: CoProcessFunction[CRow, CRow, CRow]#Context, - out: Collector[CRow], - myWatermark: Long, - oppositeWatermark: Long, - oppositeTimeState: ValueState[Long], - recordListCache: MapState[Long, JList[Row]], - oppositeCache: MapState[Long, JList[Row]], - leftRecord: Boolean): Unit = { - if (relativeWindowSize > 0) { - //TODO Shall we consider adding a method for initialization with the context and collector? - cRowWrapper.out = out - - val record = cRowValue.row - - //TODO Only if the time of the record is greater than the watermark, can we continue. - if (timeForRecord >= myWatermark - allowedLateness) { - val oppositeLowerBound: Long = - if (leftRecord) timeForRecord - rightRelativeSize else timeForRecord - leftRelativeSize - - val oppositeUpperBound: Long = - if (leftRecord) timeForRecord + leftRelativeSize else timeForRecord + rightRelativeSize - - // Put the record into the cache for later use. - val recordList = if (recordListCache.contains(timeForRecord)) { - recordListCache.get(timeForRecord) - } else { - new util.ArrayList[Row]() - } - recordList.add(record) - recordListCache.put(timeForRecord, recordList) - - // Register a timer on THE OTHER STREAM to remove records from the cache once they are - // expired. - if (oppositeTimeState.value == 0) { - registerCleanUpTimer( - ctx, timeForRecord, oppositeWatermark, oppositeTimeState, leftRecord, true) - } + cRowValue: CRow, + timeForRow: Long, + ctx: CoProcessFunction[CRow, CRow, CRow]#Context, + out: Collector[CRow], + myWatermark: Long, + oppositeLowerBound: Long, + oppositeUpperBound: Long, + oppositeWatermark: Long, + oppositeTimeState: ValueState[Long], + rowListCache: MapState[Long, JList[Row]], + oppositeCache: MapState[Long, JList[Row]], + leftRow: Boolean): Unit = { + cRowWrapper.out = out + val row = cRowValue.row + if (!checkRowOutOfDate(timeForRow, myWatermark)) { + // Put the row into the cache for later use. + var rowList = rowListCache.get(timeForRow) + if (null == rowList) { + rowList = new ArrayList[Row](1) + } + rowList.add(row) + rowListCache.put(timeForRow, rowList) + // Register a timer on THE OPPOSITE STREAM to remove rows from the cache once they are + // expired. + if (oppositeTimeState.value == 0) { + registerCleanUpTimer( + ctx, timeForRow, oppositeWatermark, oppositeTimeState, leftRow, firstTimer = true) + } - // Join the record with records from the opposite stream. - val oppositeIterator = oppositeCache.iterator() - var oppositeEntry: Entry[Long, util.List[Row]] = null - var oppositeTime: Long = 0L; - while (oppositeIterator.hasNext) { - oppositeEntry = oppositeIterator.next - oppositeTime = oppositeEntry.getKey - if (oppositeTime < oppositeLowerBound - allowedLateness) { - //TODO Considering the data out-of-order, we should not remove records here. - } else if (oppositeTime >= oppositeLowerBound && oppositeTime <= oppositeUpperBound) { - val oppositeRows = oppositeEntry.getValue - var i = 0 - if (leftRecord) { - while (i < oppositeRows.size) { - joinFunction.join(record, oppositeRows.get(i), cRowWrapper) - i += 1 - } - } else { - while (i < oppositeRows.size) { - joinFunction.join(oppositeRows.get(i), record, cRowWrapper) - i += 1 - } + // Join the row with rows from the opposite stream. + val oppositeIterator = oppositeCache.iterator() + while (oppositeIterator.hasNext) { + val oppositeEntry = oppositeIterator.next + val oppositeTime = oppositeEntry.getKey + if (oppositeTime >= oppositeLowerBound && oppositeTime <= oppositeUpperBound) { + val oppositeRows = oppositeEntry.getValue + var i = 0 + if (leftRow) { + while (i < oppositeRows.size) { + joinFunction.join(row, oppositeRows.get(i), cRowWrapper) + i += 1 + } + } else { + while (i < oppositeRows.size) { + joinFunction.join(oppositeRows.get(i), row, cRowWrapper) + i += 1 } - } else if (oppositeTime > oppositeUpperBound) { - //TODO If the keys are ordered, can we break here? } } - } else { - //TODO Need some extra logic here? - LOG.warn(s"$record is out-of-date.") + // We could do the short-cutting optimization here once we get a state with ordered keys. } } + // We need to deal with the late data in the future. --- End diff -- Totally agree. We don't need to handle late records. However, I think we should provide some runtime statistics about the late data. That could help the user know more about the data/the watermark assigner in use. What do you think? ---