Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D0B13200C3A for ; Fri, 31 Mar 2017 21:52:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CF440160B80; Fri, 31 Mar 2017 19:52:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 71B99160B7C for ; Fri, 31 Mar 2017 21:52:56 +0200 (CEST) Received: (qmail 7584 invoked by uid 500); 31 Mar 2017 19:52:55 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 7575 invoked by uid 99); 31 Mar 2017 19:52:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Mar 2017 19:52:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8235DDFF5A; Fri, 31 Mar 2017 19:52:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Message-Id: <9eceb1b13be747f5915ee2e21fb5dc12@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5654] [table] Add processing-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL. Date: Fri, 31 Mar 2017 19:52:55 +0000 (UTC) archived-at: Fri, 31 Mar 2017 19:52:58 -0000 Repository: flink Updated Branches: refs/heads/master a48357db8 -> 31e120a98 [FLINK-5654] [table] Add processing-time OVER RANGE BETWEEN x PRECEDING aggregation to SQL. This closes #3641. This closes #3590. This closes #3550. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31e120a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31e120a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31e120a9 Branch: refs/heads/master Commit: 31e120a98da673ee12ae5879d95243fa0b555e00 Parents: a48357d Author: rtudoran Authored: Wed Mar 29 12:02:11 2017 +0200 Committer: Fabian Hueske Committed: Fri Mar 31 21:42:48 2017 +0200 ---------------------------------------------------------------------- flink-libraries/flink-table/pom.xml | 14 ++ .../datastream/DataStreamOverAggregate.scala | 10 +- .../table/runtime/aggregate/AggregateUtil.scala | 25 ++- ...ndedProcessingOverRangeProcessFunction.scala | 203 ++++++++++++++++++ .../scala/stream/sql/WindowAggregateTest.scala | 53 +++++ ...ProcessingOverRangeProcessFunctionTest.scala | 204 +++++++++++++++++++ 6 files changed, 497 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index a2945e8..6bcddc2 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -140,6 +140,20 @@ under the License. ${project.version} test + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + test-jar + http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 2df4e02..e15db01 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -114,12 +114,14 @@ class DataStreamOverAggregate( // ROWS clause bounded OVER window createBoundedAndCurrentRowOverWindow( inputDS, - isRangeClause = true, + isRangeClause = false, isRowTimeType = false) } else { // RANGE clause bounded OVER window - throw new TableException( - "processing-time OVER RANGE PRECEDING window is not supported yet.") + createBoundedAndCurrentRowOverWindow( + inputDS, + isRangeClause = true, + isRowTimeType = false) } } else { throw new TableException( @@ -206,7 +208,7 @@ class DataStreamOverAggregate( val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates val precedingOffset = - getLowerBoundary(logicWindow, overWindow, getInput()) + 1 + getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRangeClause) 0 else 1) // get the output types val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 74dc5cd..caa2818 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -139,13 +139,23 @@ object AggregateUtil { ) } } else { - new BoundedProcessingOverRowProcessFunction( - aggregates, - aggFields, - precedingOffset, - inputType.getFieldCount, - aggregationStateType, - FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + if (isRangeClause) { + new BoundedProcessingOverRangeProcessFunction( + aggregates, + aggFields, + inputType.getFieldCount, + aggregationStateType, + precedingOffset, + FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + } else { + new BoundedProcessingOverRowProcessFunction( + aggregates, + aggFields, + precedingOffset, + inputType.getFieldCount, + aggregationStateType, + FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + } } } @@ -1240,4 +1250,3 @@ object AggregateUtil { if (b == 0) a else gcd(b, a % b) } } - http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala new file mode 100644 index 0000000..afab11d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunction.scala @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param precedingTimeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class BoundedProcessingOverRangeProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val precedingTimeBoundary: Long, + private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { + output = new Row(forwardedFieldCount + aggregates.length) + + // We keep the elements received in a MapState indexed based on their ingestion time + val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + + val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) + accumulatorState = getRuntimeContext.getState(stateDescriptor) + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + val currentTime = ctx.timerService.currentProcessingTime + // buffer the event incoming event + + // add current element to the window list of elements with corresponding timestamp + var rowList = rowMapState.get(currentTime) + // null value means that this si the first event received for this timestamp + if (rowList == null) { + rowList = new ArrayList[Row]() + // register timer to process event once the current millisecond passed + ctx.timerService.registerProcessingTimeTimer(currentTime + 1) + } + rowList.add(input) + rowMapState.put(currentTime, rowList) + + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + + // we consider the original timestamp of events that have registered this time trigger 1 ms ago + val currentTime = timestamp - 1 + var i = 0 + + // initialize the accumulators + var accumulators = accumulatorState.value() + + if (null == accumulators) { + accumulators = new Row(aggregates.length) + i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + + // update the elements to be removed and retract them from aggregators + val limit = currentTime - precedingTimeBoundary + + // we iterate through all elements in the window buffer based on timestamp keys + // when we find timestamps that are out of interest, we retrieve corresponding elements + // and eliminate them. Multiple elements could have been received at the same timestamp + // the removal of old elements happens only once per proctime as onTimer is called only once + val iter = rowMapState.keys.iterator + val markToRemove = new ArrayList[Long]() + while (iter.hasNext) { + val elementKey = iter.next + if (elementKey < limit) { + // element key outside of window. Retract values + val elementsRemove = rowMapState.get(elementKey) + var iRemove = 0 + while (iRemove < elementsRemove.size()) { + i = 0 + while (i < aggregates.length) { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).retract(accumulator, elementsRemove.get(iRemove) + .getField(aggFields(i)(0))) + i += 1 + } + iRemove += 1 + } + // mark element for later removal not to modify the iterator over MapState + markToRemove.add(elementKey) + } + } + // need to remove in 2 steps not to have concurrent access errors via iterator to the MapState + i = 0 + while (i < markToRemove.size()) { + rowMapState.remove(markToRemove.get(i)) + i += 1 + } + + // get the list of elements of current proctime + val currentElements = rowMapState.get(currentTime) + // add current elements to aggregator. Multiple elements might have arrived in the same proctime + // the same accumulator value will be computed for all elements + var iElemenets = 0 + while (iElemenets < currentElements.size()) { + val input = currentElements.get(iElemenets) + i = 0 + while (i < aggregates.length) { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) + i += 1 + } + iElemenets += 1 + } + + // we need to build the output and emit for every event received at this proctime + iElemenets = 0 + while (iElemenets < currentElements.size()) { + val input = currentElements.get(iElemenets) + + // set the fields of the last event to carry on with the aggregates + i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + // add the accumulators values to result + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + out.collect(output) + iElemenets += 1 + } + + // update the value of accumulators for future incremental computation + accumulatorState.update(accumulators) + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 52fd5f8..4e0d4fd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -30,6 +30,59 @@ class WindowAggregateTest extends TableTestBase { streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) @Test + def testNonPartitionedProcessingTimeBoundedWindow() = { + + val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " + + "FROM MyTable" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0") + ), + term("select", "a", "w0$o0 AS $1") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testPartitionedProcessingTimeBoundedWindow() = { + + val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " + + "FROM MyTable" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy","a"), + term("orderBy", "PROCTIME"), + term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1") + ), + term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA") + ) + + streamUtil.verifySql(sqlQuery, expected) + } + + @Test def testNonPartitionedTumbleWindow() = { val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)" val expected = http://git-wip-us.apache.org/repos/asf/flink/blob/31e120a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala new file mode 100644 index 0000000..227bfc7 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.Comparator +import java.util.concurrent.ConcurrentLinkedQueue +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction} +import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._ +import org.apache.flink.types.Row +import org.junit.Test + +class BoundedProcessingOverRangeProcessFunctionTest { + + @Test + def testProcTimePartitionedOverRange(): Unit = { + + val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a", "b", "c", "d", "e")) + + val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) + + val processFunction = new KeyedProcessOperator[String, Row, Row]( + new BoundedProcessingOverRangeProcessFunction( + Array(new LongMinWithRetractAggFunction, new LongMaxWithRetractAggFunction), + Array(Array(4), Array(4)), + 5, + rTA, + 1000, + rT)) + + val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, Row, Row]( + processFunction, + new TupleRowSelector(0), + BasicTypeInfo.INT_TYPE_INFO) + + testHarness.open() + + // Time = 3 + testHarness.setProcessingTime(3) + // key = 1 + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), 0)) + // key = 2 + testHarness.processElement(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), 0)) + + // Time = 4 + testHarness.setProcessingTime(4) + // key = 1 + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), 0)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), 0)) + // key = 2 + testHarness.processElement(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), 0)) + + // Time = 5 + testHarness.setProcessingTime(5) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), 0)) + + // Time = 6 + testHarness.setProcessingTime(6) + + // Time = 1002 + testHarness.setProcessingTime(1002) + // key = 1 + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), 0)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), 0)) + // key = 2 + testHarness.processElement(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), 0)) + + // Time = 1003 + testHarness.setProcessingTime(1003) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), 0)) + + // Time = 1004 + testHarness.setProcessingTime(1004) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), 0)) + + // Time = 1005 + testHarness.setProcessingTime(1005) + // key = 1 + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), 0)) + testHarness.processElement(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), 0)) + // key = 2 + testHarness.processElement(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), 0)) + + testHarness.setProcessingTime(1006) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // all elements at the same proc timestamp have the same value + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 4)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), 5)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), 5)) + expectedOutput.add(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 5)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), 6)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), 1003)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), 1003)) + expectedOutput.add(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 1003)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), 1004)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), 1005)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), 1006)) + expectedOutput.add(new StreamRecord( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), 1006)) + expectedOutput.add(new StreamRecord( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), 1006)) + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", + expectedOutput, result, new RowResultSortComparator(6)) + + testHarness.close() + + } +} + +object BoundedProcessingOverRangeProcessFunctionTest { + +/** + * Return 0 for equal Rows and non zero for different rows + */ +class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable { + + override def compare(o1: Object, o2: Object):Int = { + + if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) { + // watermark is not expected + -1 + } else { + val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue + val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue + row1.toString.compareTo(row2.toString) + } + } +} + +/** + * Simple test class that returns a specified field as the selector function + */ +class TupleRowSelector( + private val selectorField:Int) extends KeySelector[Row, Integer] { + + override def getKey(value: Row): Integer = { + value.getField(selectorField).asInstanceOf[Integer] + } +} + +}