flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5715) Asynchronous snapshotting for HeapKeyedStateBackend
Date Thu, 09 Mar 2017 10:18:38 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902840#comment-15902840
] 

ASF GitHub Bot commented on FLINK-5715:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3466#discussion_r105129267
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
---
    @@ -68,62 +63,29 @@ public HeapReducingState(
     
     	@Override
     	public V get() {
    -		Preconditions.checkState(currentNamespace != null, "No namespace set.");
    -		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
    -
    -		Map<N, Map<K, V>> namespaceMap =
    -				stateTable.get(backend.getCurrentKeyGroupIndex());
    -
    -		if (namespaceMap == null) {
    -			return null;
    -		}
    -
    -		Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
    -
    -		if (keyedMap == null) {
    -			return null;
    -		}
    -
    -		return keyedMap.get(backend.<K>getCurrentKey());
    +		return stateTable.get(currentNamespace);
     	}
     
     	@Override
     	public void add(V value) throws IOException {
    -		Preconditions.checkState(currentNamespace != null, "No namespace set.");
    -		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
    +		final N namespace = currentNamespace;
     
     		if (value == null) {
     			clear();
     			return;
     		}
     
    -		Map<N, Map<K, V>> namespaceMap =
    -				stateTable.get(backend.getCurrentKeyGroupIndex());
    -
    -		if (namespaceMap == null) {
    -			namespaceMap = createNewMap();
    -			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
    -		}
    -
    -		Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
    -
    -		if (keyedMap == null) {
    -			keyedMap = createNewMap();
    -			namespaceMap.put(currentNamespace, keyedMap);
    -		}
    -
    -		V currentValue = keyedMap.put(backend.<K>getCurrentKey(), value);
    +		final StateTable<K, N, V> map = stateTable;
    +		final V currentValue = map.putAndGetOld(namespace, value);
    --- End diff --
    
    I have already generalized and implemented the push-down as part of #3483 (avoiding too
much rebasing). Would be nice if you could also take a look at that.


> Asynchronous snapshotting for HeapKeyedStateBackend
> ---------------------------------------------------
>
>                 Key: FLINK-5715
>                 URL: https://issues.apache.org/jira/browse/FLINK-5715
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> Blocking snapshots render the HeapKeyedStateBackend practically unusable for many user
in productions. Their jobs can not tolerate stopped processing for the time it takes to write
gigabytes of data from memory to disk. Asynchronous snapshots would be a solution to this
problem. The challenge for the implementation is coming up with a copy-on-write scheme for
the in-memory hash maps that build the foundation of this backend. After taking a closer look,
this problem is twofold. First, providing CoW semantics for the hashmap itself, as a mutible
structure, thereby avoiding costly locking or blocking where possible. Second, CoW for the
mutable value objects, e.g. through cloning via serializers.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message