flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception
Date Fri, 23 Mar 2018 10:51:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411172#comment-16411172
] 

Kostas Kloudas edited comment on FLINK-9060 at 3/23/18 10:50 AM:
-----------------------------------------------------------------

I would say that this code reveals a real bug but in the wrong place. 

The {{getKeys()}} method, mentioned in the javadocs ("Modifications to the state during iterating over
it keys are not supported."), is not supposed to be used for modifying the state. And I would
recommend to not change this because the performance benefits of lazy retrieval of the elements
without worrying about memory quotas may outweigh the shortcomings in other places. I assume
that this is the reason why [~pnowojski] introduced it.

That said, this issue reveals that the {{applyToAllKeys()}} implementation is problematic
and this is where the fix should be applied. Not in the {{getKeys()}} implementation. For
{{RocksDB}}, this may be enough to stay as it is, but the implementation for the {{HeapStateBackend}}
must change. 

For the specifics of the implementation, creating a full list of all the keys has the danger
that the whole keyset may not fit in memory but we may be able to make this assumption. If
we go with this, then a plain-old for-loop would be more efficient than streams. Again, this
modification should only be for the {{HeapStateBackend}}.

The above also implies that the test should call the {{applyToAllKeys()}} method.

Any comments on the above [~aljoscha], [~sihuazhou] and [~pnowojski]?


was (Author: kkl0u):
I would say that this code reveals a real bug but in the wrong place. 

The {{getKeys()}} method, mentioned in the javadocs ("Modifications to the state during iterating over
it keys are not supported."), is not supposed to be used for modifying the state. And I would
recommend to not change this because the performance benefits of lazy retrieval of the elements
without worrying about memory quotas may outweigh the shortcomings in other places. I assume
that this is the reason why [~pnowojski] introduced it.

That said, this issue reveals that the {{applyToAllKeys()}} implementation is problematic
and this is where the fix should be applied. Not in the {{getKeys()}} implementation. For
{{RocksDB}}, this may be enough to stay as it is, but the implementation for the {{HeapStateBackend}}
must change. 

For the specifics of the implementation, creating a full list of all the keys has the danger
that the whole keyset may not fit in memory but we may be able to make this assumption. If
we go with this, then a plain-old for-loop would be more efficient than streams. Again, this
modification should only be for the {{HeapStateBackend}}.

Any comments on the above [~aljoscha], [~sihuazhou] and [~pnowojski]?

> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -----------------------------------------------------------------
>
>                 Key: FLINK-9060
>                 URL: https://issues.apache.org/jira/browse/FLINK-9060
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>            Reporter: Aljoscha Krettek
>            Assignee: Sihua Zhou
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
> 	AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
> 	try {
> 		ListStateDescriptor<String> listStateDescriptor =
> 			new ListStateDescriptor<>("foo", StringSerializer.INSTANCE);
> 		backend.setCurrentKey(1);
> 		backend
> 			.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
> 			.add("Hello");
> 		backend.setCurrentKey(2);
> 		backend
> 			.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
> 			.add("Ciao");
> 		Stream<Integer> keys = backend
> 			.getKeys(listStateDescriptor.getName(), VoidNamespace.INSTANCE);
> 		keys.forEach((key) -> {
> 			backend.setCurrentKey(key);
> 			try {
> 				backend
> 					.getPartitionedState(
> 						VoidNamespace.INSTANCE,
> 						VoidNamespaceSerializer.INSTANCE,
> 						listStateDescriptor)
> 					.clear();
> 			} catch (Exception e) {
> 				e.printStackTrace();
> 			}
> 		});
> 	}
> 	finally {
> 		IOUtils.closeQuietly(backend);
> 		backend.dispose();
> 	}
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and {{applyToAllKeys()}}
is to do stuff for every key, which includes deleting them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message