flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jark Wu (Jira)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
Date Wed, 01 Jul 2020 02:10:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jark Wu reassigned FLINK-18452:
-------------------------------

    Assignee: Weike Dong

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access
after restoration
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18452
>                 URL: https://issues.apache.org/jira/browse/FLINK-18452
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.10.0, 1.10.1, 1.11.0
>            Reporter: Weike Dong
>            Assignee: Weike Dong
>            Priority: Major
>         Attachments: c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink planner,
the job state cannot be retrieved because of "incompatible" state serializers (in fact they
are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, serverid, 
quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
>         at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>         at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>         at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>         at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>         at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>         at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>         at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>         at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>         at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>         ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility check of type
serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by creating a _SortedMapSerializerSnapshot_
object, and the original comparator is encapsulated within the object (here we call it _StreamExecSortComparator$579_).
>  
> At restoration, the object is read and restored as normal. However, during the construction
of RetractableTopNFunction instance, another Comparator is provided by Flink as an argument
(we call it _StreamExecSortComparator$626_), and it is later used in the _ValueStateDescriptor_
which acts like a key to the state store.
>  
> Here comes the problem: when the newly-restored Flink program tries to access state (_getState_)
through the previously mentioned _ValueStateDescriptor_, the State Backend firstly detects
whether the provided comparator in state descriptor is compatible with the one in snapshot,
eventually the logic goes to the _equals_ method at _RetractableTopNFunction.ComparatorWrapper_
class.
>  
> In the equals method, here is a code snippet:
> {code:java}
> return generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName())
&&
>       generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
>       Arrays.equals(generatedRecordComparator.getReferences(), oGeneratedComparator.getReferences());
> {code}
> After debugging, we found that the class name of comparator within snapshot is _StreamExecSortComparator$579_,
and the class name of comparator provided in the new job is _StreamExecSortComparator$626_,
hence this method always returns false, even though actually they are indeed compatible (acts
the same). Also, because the code in each generator is generated independently, the corresponding
varaibles within the two comparators are highly likely to be different (_isNullA$581_ vs _isNullA$682_).
>  
> Hence we believe that the implementation of equals method has serious flaws, and should
be addressed in later releases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message