flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6061) NPE on TypeSerializer.serialize with a RocksDBStateBackend calling entries() on a keyed state in the open() function
Date Wed, 15 Mar 2017 12:30:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926054#comment-15926054
] 

ASF GitHub Bot commented on FLINK-6061:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3545
  
    Good fix, +1 to merge


> NPE on TypeSerializer.serialize with a RocksDBStateBackend calling entries() on a keyed
state in the open() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6061
>                 URL: https://issues.apache.org/jira/browse/FLINK-6061
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing, Streaming
>    Affects Versions: 1.3.0
>            Reporter: Vladislav Pernin
>            Assignee: Stefan Richter
>
> With a default state (heap), the call to state.entries() "nicely fails" with a IllegalStateException
:
> {noformat}
> Caused by: java.lang.IllegalStateException: No key set.
> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> 	at org.apache.flink.runtime.state.heap.HeapMapState.entries(HeapMapState.java:188)
> 	at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
> 	at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> With a RocksDBStateBackend, it fails with a NPE :
> {noformat}
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:64)
> 	at org.apache.flink.api.common.typeutils.base.ShortSerializer.serialize(ShortSerializer.java:27)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKey(AbstractRocksDBState.java:181)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.writeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:148)
> 	at org.apache.flink.contrib.streaming.state.RocksDBMapState.serializeCurrentKeyAndNamespace(RocksDBMapState.java:263)
> 	at org.apache.flink.contrib.streaming.state.RocksDBMapState.iterator(RocksDBMapState.java:196)
> 	at org.apache.flink.contrib.streaming.state.RocksDBMapState.entries(RocksDBMapState.java:143)
> 	at org.apache.flink.runtime.state.UserFacingMapState.entries(UserFacingMapState.java:77)
> 	at org.apache.flink.Reproducer$FailingMapWithState.open(Reproducer.java:78)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The reason is that the record is null, because backend.getCurrentKey() is null (not yet
set) in AbstractRocksDBState.
> This may also be the case for other RockDBState implementations.
> You can find the reproducer here based on 1.3-SNAPSHOT (needed for the MapState) :
> https://github.com/vpernin/flink-rocksdbstate-npe
> The reproducer is a non sense application. There is no MapState with TTL or expiration
yet, so the goal is to try to shrink or expire the state at some interval.
> This could be done by iterating over the entries of the state and removing some of them.
> This could probably not be done in the open() method of a rich function.
> I also tried to implement CheckpointListener and to access the state content in notifyCheckpointComplete()
method, but it fails to, I guess due to the asynchronous nature of the checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message