flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-6368) Grouping keys in stream aggregations have wrong order
Date Tue, 25 Apr 2017 14:32:04 GMT

     [ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timo Walther resolved FLINK-6368.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 1.3.0

Fixed in 1.3.0: 05088b4a61001b536b3d07e49c415606edf11fba

> Grouping keys in stream aggregations have wrong order
> -----------------------------------------------------
>
>                 Key: FLINK-6368
>                 URL: https://issues.apache.org/jira/browse/FLINK-6368
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Xingcan Cui
>             Fix For: 1.3.0
>
>
> FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It seems that
the order of grouping keys is sometimes messed up. The following tests fails:
> {code}
>   @Test
>   def testEventTimeSlidingGroupWindow(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     StreamITCase.testResults = mutable.MutableList()
>     val stream = env
>       .fromCollection(data)
>       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
>       .map(t => (t._2, t._6))
>     val table = stream.toTable(tEnv, 'int, 'string)
>     val windowedTable = table
>       .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
>       .groupBy('w, 'string)
>       .select('string, 'int.count, 'w.start, 'w.end)
>     val results = windowedTable.toDataStream[Row]
>     results.addSink(new StreamITCase.StringSink)
>     env.execute()
>   }
> {code}
> Exception:
> {code}
> Caused by: java.lang.RuntimeException: Could not forward element to next operator
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
> 	at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
> 	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
> 	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
> 	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
> 	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> 	... 7 more
> Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message