Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00481200C71 for ; Fri, 31 Mar 2017 00:04:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3579160BA1; Thu, 30 Mar 2017 22:04:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D30E3160BB4 for ; Fri, 31 Mar 2017 00:04:16 +0200 (CEST) Received: (qmail 6897 invoked by uid 500); 30 Mar 2017 22:04:15 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 5139 invoked by uid 99); 30 Mar 2017 22:04:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 22:04:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 403D9E9663; Thu, 30 Mar 2017 22:04:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Thu, 30 Mar 2017 22:05:01 -0000 Message-Id: <230100cf813544d787207a88c0d54fb4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] flink git commit: [FLINK-5653] [table] Add processing-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL. archived-at: Thu, 30 Mar 2017 22:04:19 -0000 [FLINK-5653] [table] Add processing-time OVER ROWS BETWEEN x PRECEDING aggregation to SQL. This closes #3653. This closes #3574. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee033c90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee033c90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee033c90 Branch: refs/heads/table-retraction Commit: ee033c903b20d7a233009764b6b96e78eea5b981 Parents: 44f9c76 Author: Stefano Bortoli Authored: Thu Mar 30 11:28:41 2017 +0200 Committer: Fabian Hueske Committed: Thu Mar 30 22:12:21 2017 +0200 ---------------------------------------------------------------------- .../datastream/DataStreamOverAggregate.scala | 6 +- .../table/runtime/aggregate/AggregateUtil.scala | 9 +- ...oundedProcessingOverRowProcessFunction.scala | 181 ++++++++++++++++++ .../table/api/scala/stream/sql/SqlITCase.scala | 184 ++++++++++++++++++- .../scala/stream/sql/WindowAggregateTest.scala | 55 ++++++ 5 files changed, 423 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index e24dd23..2df4e02 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -112,8 +112,10 @@ class DataStreamOverAggregate( // bounded OVER window if (overWindow.isRows) { // ROWS clause bounded OVER window - throw new TableException( - "processing-time OVER ROWS PRECEDING window is not supported yet.") + createBoundedAndCurrentRowOverWindow( + inputDS, + isRangeClause = true, + isRowTimeType = false) } else { // RANGE clause bounded OVER window throw new TableException( http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 93ab7b7..88e9d68 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -138,8 +138,13 @@ object AggregateUtil { ) } } else { - throw TableException( - "Bounded partitioned proc-time OVER aggregation is not supported yet.") + new BoundedProcessingOverRowProcessFunction( + aggregates, + aggFields, + precedingOffset, + inputType.getFieldCount, + aggregationStateType, + FlinkTypeFactory.toInternalRowTypeInfo(inputType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala new file mode 100644 index 0000000..454b177 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class BoundedProcessingOverRowProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + // We keep the elements received in a Map state keyed + // by the ingestion time in the operator. + // we also keep counter of processed elements + // and timestamp of oldest element + val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + + val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + + val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) + counterState = getRuntimeContext.getState(processedCountDescriptor) + + val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) + smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + + val currentTime = ctx.timerService.currentProcessingTime + var i = 0 + + // initialize state for the processed element + var accumulators = accumulatorState.value + if (accumulators == null) { + accumulators = new Row(aggregates.length) + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + + // get smallest timestamp + var smallestTs = smallestTsState.value + if (smallestTs == 0L) { + smallestTs = currentTime + smallestTsState.update(smallestTs) + } + // get previous counter value + var counter = counterState.value + + if (counter == precedingOffset) { + val retractList = rowMapState.get(smallestTs) + + // get oldest element beyond buffer size + // and if oldest element exist, retract value + i = 0 + while (i < aggregates.length) { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i))) + i += 1 + } + retractList.remove(0) + // if reference timestamp list not empty, keep the list + if (!retractList.isEmpty) { + rowMapState.put(smallestTs, retractList) + } // if smallest timestamp list is empty, remove and find new smallest + else { + rowMapState.remove(smallestTs) + val iter = rowMapState.keys.iterator + var currentTs: Long = 0L + var newSmallestTs: Long = Long.MaxValue + while (iter.hasNext) { + currentTs = iter.next + if (currentTs < newSmallestTs) { + newSmallestTs = currentTs + } + } + smallestTsState.update(newSmallestTs) + } + } // we update the counter only while buffer is getting filled + else { + counter += 1 + counterState.update(counter) + } + + // copy forwarded fields in output row + i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + // accumulate current row and set aggregate in output row + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + + // update map state, accumulator state, counter and timestamp + val currentTimeState = rowMapState.get(currentTime) + if (currentTimeState != null) { + currentTimeState.add(input) + rowMapState.put(currentTime, currentTimeState) + } else { // add new input + val newList = new util.ArrayList[Row] + newList.add(input) + rowMapState.put(currentTime, newList) + } + + accumulatorState.update(accumulators) + + out.collect(output) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 0d3a46c..67d13b0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -344,7 +344,7 @@ class SqlITCase extends StreamingWithStateTestBase { val expected = mutable.MutableList( "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3", - "Hello,2,3,4", "Hello,2,3,5","Hello,2,3,6", + "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6", "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12", "Hello,6,3,15", "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21", @@ -471,12 +471,12 @@ class SqlITCase extends StreamingWithStateTestBase { val expected = mutable.MutableList( "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", - "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9", + "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9", "Hello,3,4,9", "Hello,4,2,7", "Hello,5,2,9", - "Hello,6,2,11","Hello,65,2,12", - "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18", + "Hello,6,2,11", "Hello,65,2,12", + "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18", "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7", "Hello World,8,2,15", "Hello World,20,1,20") @@ -543,12 +543,12 @@ class SqlITCase extends StreamingWithStateTestBase { val expected = mutable.MutableList( "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3", - "Hello,2,6,9", "Hello,3,6,9","Hello,2,6,9", + "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9", "Hello,3,4,9", "Hello,4,2,7", "Hello,5,2,9", - "Hello,6,2,11","Hello,65,2,12", - "Hello,9,2,12","Hello,9,2,12","Hello,18,3,18", + "Hello,6,2,11", "Hello,65,2,12", + "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18", "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7", "Hello World,8,2,15", "Hello World,20,1,20") @@ -556,7 +556,7 @@ class SqlITCase extends StreamingWithStateTestBase { } /** - * All aggregates must be computed on the same window. + * All aggregates must be computed on the same window. */ @Test(expected = classOf[TableException]) def testMultiWindow(): Unit = { @@ -972,6 +972,174 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testPartitionedProcTimeOverWindow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a, " + + " SUM(c) OVER (" + + " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " + + " MIN(c) OVER (" + + " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " + + " FROM MyTable" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,24,7", + "5,10,10", + "5,21,10", + "5,33,10", + "5,36,11", + "5,39,12") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testPartitionedProcTimeOverWindow2(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a, " + + " SUM(c) OVER (" + + " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " + + " MIN(c) OVER (" + + " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " + + " FROM MyTable" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,3,1", + "3,3,3", + "3,7,3", + "3,12,3", + "4,6,6", + "4,13,6", + "4,21,6", + "4,30,6", + "5,10,10", + "5,21,10", + "5,33,10", + "5,46,10", + "5,60,10") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testNonPartitionedProcTimeOverWindow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a, " + + " SUM(c) OVER (" + + " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " + + " MIN(c) OVER (" + + " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " + + " FROM MyTable" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,0", + "2,3,0", + "3,6,1", + "3,9,2", + "3,12,3", + "4,15,4", + "4,18,5", + "4,21,6", + "4,24,7", + "5,27,8", + "5,30,9", + "5,33,10", + "5,36,11", + "5,39,12") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonPartitionedProcTimeOverWindow2(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.testResults = mutable.MutableList() + + val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a, " + + " SUM(c) OVER (" + + " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " + + " MIN(c) OVER (" + + " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " + + " FROM MyTable" + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,0,0", + "2,1,0", + "2,3,0", + "3,6,0", + "3,10,0", + "3,15,0", + "4,21,0", + "4,28,0", + "4,36,0", + "4,45,0", + "5,55,0", + "5,66,1", + "5,77,2", + "5,88,3", + "5,99,4") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + } object SqlITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/ee033c90/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 45d204a..52fd5f8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -405,4 +405,59 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testBoundNonPartitionedProcTimeWindowWithRowRange() = { + val sql = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS $1") + ) + streamUtil.verifySql(sql, expected) + } + + @Test + def testBoundPartitionedProcTimeWindowWithRowRange() = { + val sql = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + ), + term("select", "c", "w0$o0 AS $1") + ) + streamUtil.verifySql(sql, expected) + } + }