Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA7A9200C70 for ; Wed, 19 Apr 2017 11:11:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B95FB160B94; Wed, 19 Apr 2017 09:11:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AAF36160B86 for ; Wed, 19 Apr 2017 11:11:45 +0200 (CEST) Received: (qmail 4397 invoked by uid 500); 19 Apr 2017 09:11:44 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 4388 invoked by uid 99); 19 Apr 2017 09:11:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 09:11:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 25A32C0B4D for ; Wed, 19 Apr 2017 09:11:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.001 X-Spam-Level: X-Spam-Status: No, score=-100.001 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Uhb0hRdip6a1 for ; Wed, 19 Apr 2017 09:11:43 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E71B15FCAE for ; Wed, 19 Apr 2017 09:11:42 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 64BABE06C5 for ; Wed, 19 Apr 2017 09:11:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2200521B47 for ; Wed, 19 Apr 2017 09:11:42 +0000 (UTC) Date: Wed, 19 Apr 2017 09:11:42 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 19 Apr 2017 09:11:46 -0000 [ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974357#comment-15974357 ] ASF GitHub Bot commented on FLINK-6250: --------------------------------------- Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112151646 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + // We keep the elements received in a Map state keyed + // by the ingestion time in the operator. + // we also keep counter of processed elements + // and timestamp of oldest element + val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + + val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + + val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) + counterState = getRuntimeContext.getState(processedCountDescriptor) + + val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) + smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + + val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row]) + distinctValueState = getRuntimeContext.getMapState(distinctValDescriptor) + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + val currentTime = ctx.timerService.currentProcessingTime + var i = 0 + + // initialize state for the processed element + var accumulators = accumulatorState.value + if (accumulators == null) { + accumulators = new Row(aggregates.length) + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + + // get smallest timestamp + var smallestTs = smallestTsState.value + if (smallestTs == 0L) { + smallestTs = currentTime + smallestTsState.update(smallestTs) + } + // get previous counter value + var counter = counterState.value + + if (counter == precedingOffset) { + val retractList = rowMapState.get(smallestTs) + + // get oldest element beyond buffer size + // and if oldest element exist, retract value + var removeCounter :Integer = 0 + var distinctCounter : Integer = 0 + var retractVal : Object = null + i = 0 + while (i < aggregates.length) { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + retractVal = retractList.get(0).getField(aggFields(i)(0)) + if(distinctAggsFlag(i)){ + distinctCounter += 1 + val counterRow = distinctValueState.get(retractVal) --- End diff -- the map state takes Any as key, but I agree with you that I didn't test this aspect through > Distinct procTime with Rows boundaries > -------------------------------------- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: radu > Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)