flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hequn Cheng (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.
Date Wed, 13 Jun 2018 02:40:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510544#comment-16510544
] 

Hequn Cheng commented on FLINK-9528:
------------------------------------

You are right, we should not add state to all filters. Add state only when filter is working
under AccMode and with UniqueKey(Maybe we can give the filter a name, say, *UpsertFilter*).


Considering the case: Source -> Agg -> UpsertFilter -> UpsertSink, we need to add
a state to the UpsertFilter to send delete message when it is necessary(predicate evaluates
to false). To make sure messages with same uniquekey go to a same place, we need to do keyby
in UpsertFilter.

> Incorrect results: Filter does not treat Upsert messages correctly.
> -------------------------------------------------------------------
>
>                 Key: FLINK-9528
>                 URL: https://issues.apache.org/jira/browse/FLINK-9528
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.3.3, 1.5.0, 1.4.2
>            Reporter: Fabian Hueske
>            Assignee: Hequn Cheng
>            Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between retraction
and upsert mode. A Calc looks at record (regardless of its update semantics) and either discard
it (predicate evaluates to false) or pass it on (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for upsert
messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.getConfig.enableObjectReuse()
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val t = StreamTestData.get3TupleDataStream(env)
>       .assignAscendingTimestamps(_._1.toLong)
>       .toTable(tEnv, 'id, 'num, 'text)
>     t.select('text.charLength() as 'len)
>       .groupBy('len)
>       .select('len, 'len.count as 'cnt)
>       // .where('cnt < 7)
>       .writeToSink(new TestUpsertSink(Array("len"), false))
>     env.execute()
>     val results = RowCollector.getAndClearValues
>     val retracted = RowCollector.upsertResults(results, Array(0)).sorted
>     val expectedWithoutFilter = List(
>       "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
>     val expectedWithFilter = List(
>     "2,1", "5,1", "11,1", "14,1", "25,1").sorted
>     assertEquals(expectedWithoutFilter, retracted)
>     // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows that do
not fulfill the condition are removed from the result. However, the filter only removes the
upsert message such that the previous version remains in the result.
> One solution could be to make a filter aware of the update semantics (retract or upsert)
and convert the upsert message into a delete message if the predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message