Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C41EA200C6D for ; Sun, 7 May 2017 14:18:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C2A4A160B9A; Sun, 7 May 2017 12:18:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C24D0160BB1 for ; Sun, 7 May 2017 14:18:29 +0200 (CEST) Received: (qmail 39724 invoked by uid 500); 7 May 2017 12:18:29 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 39705 invoked by uid 99); 7 May 2017 12:18:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 07 May 2017 12:18:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6C5D7E00A3; Sun, 7 May 2017 12:18:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Sun, 07 May 2017 12:18:29 -0000 Message-Id: In-Reply-To: <50557b1ceaff4c71b7a2a94b5277a93c@git.apache.org> References: <50557b1ceaff4c71b7a2a94b5277a93c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] flink git commit: [FLINK-6257] [table] Refactor OVER window tests. archived-at: Sun, 07 May 2017 12:18:32 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 4c1d6e6..125d071 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -32,64 +32,9 @@ class WindowAggregateTest extends TableTestBase { "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) @Test - def testNonPartitionedProcessingTimeBoundedWindow() = { - - val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime " + - "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " + - "FROM MyTable" - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "proctime") - ), - term("orderBy", "proctime"), - term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0") - ), - term("select", "a", "w0$o0 AS $1") - ) - - streamUtil.verifySql(sqlQuery, expected) - } - - @Test - def testPartitionedProcessingTimeBoundedWindow() = { - - val sqlQuery = - "SELECT a, " + - " AVG(c) OVER (PARTITION BY a ORDER BY proctime " + - " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " + - "FROM MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "proctime") - ), - term("partitionBy","a"), - term("orderBy", "proctime"), - term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1") - ), - term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA") - ) - - streamUtil.verifySql(sqlQuery, expected) - } - - @Test def testGroupbyWithoutWindow() = { val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b" + val expected = unaryNode( "DataStreamCalc", @@ -241,327 +186,4 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sqlQuery, "n/a") } - - @Test - def testUnboundPartitionedProcessingWindowWithRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "proctime") - ), - term("partitionBy", "c"), - term("orderBy", "proctime"), - term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") - ), - term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testUnboundPartitionedProcessingWindowWithRow() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + - "CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - streamTableNode(0), - term("partitionBy", "c"), - term("orderBy", "proctime"), - term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testUnboundNonPartitionedProcessingWindowWithRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "proctime") - ), - term("orderBy", "proctime"), - term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") - ), - term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testUnboundNonPartitionedProcessingWindowWithRow() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + - "CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - streamTableNode(0), - term("orderBy", "proctime"), - term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testUnboundNonPartitionedEventTimeWindowWithRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "rowtime") - ), - term("orderBy", "rowtime"), - term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") - ), - term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testUnboundPartitionedEventTimeWindowWithRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "rowtime") - ), - term("partitionBy", "c"), - term("orderBy", "rowtime"), - term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") - ), - term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testBoundPartitionedRowTimeWindowWithRow() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " + - "CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "rowtime") - ), - term("partitionBy", "c"), - term("orderBy", "rowtime"), - term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testBoundNonPartitionedRowTimeWindowWithRow() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " + - "CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "rowtime") - ), - term("orderBy", "rowtime"), - term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testBoundPartitionedRowTimeWindowWithRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (PARTITION BY c ORDER BY rowtime " + - "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "rowtime") - ), - term("partitionBy", "c"), - term("orderBy", "rowtime"), - term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testBoundNonPartitionedRowTimeWindowWithRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (ORDER BY rowtime " + - "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "rowtime") - ), - term("orderBy", "rowtime"), - term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testBoundNonPartitionedProcTimeWindowWithRowRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " + - "CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "proctime") - ), - term("orderBy", "proctime"), - term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - - @Test - def testBoundPartitionedProcTimeWindowWithRowRange() = { - val sql = "SELECT " + - "c, " + - "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " + - "CURRENT ROW) as cnt1 " + - "from MyTable" - - val expected = - unaryNode( - "DataStreamCalc", - unaryNode( - "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "proctime") - ), - term("partitionBy", "c"), - term("orderBy", "proctime"), - term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") - ), - term("select", "c", "w0$o0 AS $1") - ) - streamUtil.verifySql(sql, expected) - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala new file mode 100644 index 0000000..eb5acd5b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import java.util.{Comparator, Queue => JQueue} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.streaming.api.operators.OneInputStreamOperator +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} +import org.apache.flink.table.runtime.types.CRow + +class HarnessTestBase { + def createHarnessTester[IN, OUT, KEY]( + operator: OneInputStreamOperator[IN, OUT], + keySelector: KeySelector[IN, KEY], + keyType: TypeInformation[KEY]): KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = { + new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](operator, keySelector, keyType) + } + + def verify( + expected: JQueue[Object], + actual: JQueue[Object], + comparator: Comparator[Object], + checkWaterMark: Boolean = false): Unit = { + if (!checkWaterMark) { + val it = actual.iterator() + while (it.hasNext) { + val data = it.next() + if (data.isInstanceOf[Watermark]) { + actual.remove(data) + } + } + } + TestHarnessUtil.assertOutputEqualsSorted("Verify Error...", expected, actual, comparator) + } +} + +object HarnessTestBase { + + /** + * Return 0 for equal Rows and non zero for different rows + */ + class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable { + + override def compare(o1: Object, o2: Object): Int = { + + if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) { + // watermark is not expected + -1 + } else { + val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue + val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue + row1.toString.compareTo(row2.toString) + } + } + } + + /** + * Tuple row key selector that returns a specified field as the selector function + */ + class TupleRowKeySelector[T]( + private val selectorField: Int) extends KeySelector[CRow, T] { + + override def getKey(value: CRow): T = { + value.row.getField(selectorField).asInstanceOf[T] + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala new file mode 100644 index 0000000..56ca85c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala @@ -0,0 +1,974 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import java.lang.{Integer => JInt, Long => JLong} +import java.util.concurrent.ConcurrentLinkedQueue + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.codegen.GeneratedAggregationsFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction} +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.harness.HarnessTestBase._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row +import org.junit.Test + +class OverWindowHarnessTest extends HarnessTestBase{ + + private val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a", "b", "c", "d", "e")) + + private val cRT = new CRowTypeInfo(rT) + + private val aggregates = + Array(new LongMinWithRetractAggFunction, + new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]] + private val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates) + + val funcCode: String = + """ + |public class BoundedOverAggregateHelper + | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { + | + | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction + | fmin = null; + | + | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction + | fmax = null; + | + | public BoundedOverAggregateHelper() throws Exception { + | + | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction) + | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + + | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" + + | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + + | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + + | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); + | + | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction) + | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + + | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" + + | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" + + | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" + + | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); + | } + | + | public void setAggregationResults( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row output) { + | + | org.apache.flink.table.functions.AggregateFunction baseClass0 = + | (org.apache.flink.table.functions.AggregateFunction) fmin; + | output.setField(5, baseClass0.getValue( + | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0))); + | + | org.apache.flink.table.functions.AggregateFunction baseClass1 = + | (org.apache.flink.table.functions.AggregateFunction) fmax; + | output.setField(6, baseClass1.getValue( + | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1))); + | } + | + | public void accumulate( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input) { + | + | fmin.accumulate( + | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0)), + | (java.lang.Long) input.getField(4)); + | + | fmax.accumulate( + | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1)), + | (java.lang.Long) input.getField(4)); + | } + | + | public void retract( + | org.apache.flink.types.Row accs, + | org.apache.flink.types.Row input) { + | + | fmin.retract( + | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) + | accs.getField(0)), + | (java.lang.Long) input.getField(4)); + | + | fmax.retract( + | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) + | accs.getField(1)), + | (java.lang.Long) input.getField(4)); + | } + | + | public org.apache.flink.types.Row createAccumulators() { + | + | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2); + | + | accs.setField( + | 0, + | fmin.createAccumulator()); + | + | accs.setField( + | 1, + | fmax.createAccumulator()); + | + | return accs; + | } + | + | public void setForwardedFields( + | org.apache.flink.types.Row input, + | org.apache.flink.types.Row output) { + | + | output.setField(0, input.getField(0)); + | output.setField(1, input.getField(1)); + | output.setField(2, input.getField(2)); + | output.setField(3, input.getField(3)); + | output.setField(4, input.getField(4)); + | } + | + | public org.apache.flink.types.Row createOutputRow() { + | return new org.apache.flink.types.Row(7); + | } + | + |/******* This test does not use the following methods *******/ + | public org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) { + | return null; + | } + | + | public void resetAccumulator(org.apache.flink.types.Row accs) { + | } + | + | public void setConstantFlags(org.apache.flink.types.Row output) { + | } + |} + """.stripMargin + + + private val funcName = "BoundedOverAggregateHelper" + + private val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode) + + + @Test + def testProcTimeBoundedRowsOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new ProcTimeBoundedRowsOver( + genAggFunction, + 2, + aggregationStateType, + cRT)) + + val testHarness = + createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo + .INT_TYPE_INFO) + + testHarness.open() + + testHarness.setProcessingTime(1) + + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 1)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1)) + + testHarness.setProcessingTime(2) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2)) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true), 2)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 2)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 2)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + + testHarness.close() + } + + /** + * NOTE: all elements at the same proc timestamp have the same value per key + */ + @Test + def testProcTimeBoundedRangeOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new ProcTimeBoundedRangeOver( + genAggFunction, + 1000, + aggregationStateType, + cRT)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[Integer](0), + BasicTypeInfo.INT_TYPE_INFO) + + testHarness.open() + + testHarness.setProcessingTime(3) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0)) + + testHarness.setProcessingTime(4) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0)) + + testHarness.setProcessingTime(5) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) + + testHarness.setProcessingTime(6) + + testHarness.setProcessingTime(1002) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) + + testHarness.setProcessingTime(1003) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) + + testHarness.setProcessingTime(1004) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) + + testHarness.setProcessingTime(1005) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0)) + + testHarness.setProcessingTime(1006) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // all elements at the same proc timestamp have the same value per key + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + + testHarness.close() + } + + @Test + def testProcTimeUnboundedOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new ProcTimeUnboundedPartitionedOver( + genAggFunction, + aggregationStateType)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[Integer](0), + BasicTypeInfo.INT_TYPE_INFO) + + testHarness.open() + + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) + + testHarness.setProcessingTime(1003) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 1003)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 1003)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 1003)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 1003)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 1003)) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 1003)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + testHarness.close() + } + + /** + * all elements at the same row-time have the same value per key + */ + @Test + def testRowTimeBoundedRangeOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new RowTimeBoundedRangeOver( + genAggFunction, + aggregationStateType, + cRT, + 4000)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](3), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + testHarness.processWatermark(1) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 2)) + + testHarness.processWatermark(2) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 3)) + + testHarness.processWatermark(4000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + + testHarness.processWatermark(4001) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4002)) + + testHarness.processWatermark(4002) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong), true), 4003)) + + testHarness.processWatermark(4800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong), true), 4801)) + + testHarness.processWatermark(6500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + + testHarness.processWatermark(7000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + + testHarness.processWatermark(8000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + + testHarness.processWatermark(12000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + + testHarness.processWatermark(19000) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // all elements at the same row-time have the same value per key + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 2)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 3)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4002)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4003)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true), 4801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true), 7001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 8001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + testHarness.close() + } + + @Test + def testRowTimeBoundedRowsOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new RowTimeBoundedRowsOver( + genAggFunction, + aggregationStateType, + cRT, + 3)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](3), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + testHarness.processWatermark(800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) + + testHarness.processWatermark(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501)) + + testHarness.processWatermark(4000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001)) + + testHarness.processWatermark(4800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801)) + + testHarness.processWatermark(6500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + + testHarness.processWatermark(7000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + + testHarness.processWatermark(8000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + + testHarness.processWatermark(12000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + + testHarness.processWatermark(19000) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true), 7001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true), 8001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + testHarness.close() + } + + /** + * all elements at the same row-time have the same value per key + */ + @Test + def testRowTimeUnboundedRangeOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new RowTimeUnboundedRangeOver( + genAggFunction, + aggregationStateType, + cRT)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](3), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + testHarness.processWatermark(800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) + + testHarness.processWatermark(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501)) + + testHarness.processWatermark(4000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001)) + + testHarness.processWatermark(4800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801)) + + testHarness.processWatermark(6500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + + testHarness.processWatermark(7000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + + testHarness.processWatermark(8000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + + testHarness.processWatermark(12000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + + testHarness.processWatermark(19000) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // all elements at the same row-time have the same value per key + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + testHarness.close() + } + + @Test + def testRowTimeUnboundedRowsOver(): Unit = { + + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( + new RowTimeUnboundedRowsOver( + genAggFunction, + aggregationStateType, + cRT)) + + val testHarness = + createHarnessTester( + processFunction, + new TupleRowKeySelector[String](3), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open() + + testHarness.processWatermark(800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801)) + + testHarness.processWatermark(2500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501)) + + testHarness.processWatermark(4000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001)) + + testHarness.processWatermark(4800) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801)) + + testHarness.processWatermark(6500) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501)) + + testHarness.processWatermark(7000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001)) + + testHarness.processWatermark(8000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001)) + + testHarness.processWatermark(12000) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001)) + testHarness.processElement(new StreamRecord( + CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001)) + + testHarness.processWatermark(19000) + + val result = testHarness.getOutput + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001)) + expectedOutput.add(new StreamRecord( + CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001)) + + verify(expectedOutput, result, new RowResultSortComparator(6)) + testHarness.close() + } +}