flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9182) async checkpoints for timer service
Date Thu, 03 May 2018 10:02:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462232#comment-16462232
] 

ASF GitHub Bot commented on FLINK-9182:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185749320
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
    @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time)
{
     
     	@Override
     	public void registerEventTimeTimer(N namespace, long time) {
    -		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(),
namespace);
    -		Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
    -		if (timerSet.add(timer)) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(),
namespace,
    +			this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
    +		Map<String, InternalTimer<K, N>> timerMap = getEventTimeTimerSetForTimer((K)
keyContext.getCurrentKey());
    +		InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), timer);
    +		if (prev == null) {
     			eventTimeTimersQueue.add(timer);
     		}
     	}
     
     	@Override
     	public void deleteProcessingTimeTimer(N namespace, long time) {
    -		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(),
namespace);
    -		Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
    -		if (timerSet.remove(timer)) {
    +		Map<String, InternalTimer<K, N>> timerMap = getProcessingTimeTimerSetForTimer((K)
keyContext.getCurrentKey());
    +		String key = InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(), namespace.toString(),
time);
    +		InternalTimer<K, N> timer = timerMap.get(key);
    +		if (timer != null) {
    +			timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue());
     			processingTimeTimersQueue.remove(timer);
     		}
    +		this.knInternalTimeServiceManager.getReadLock().lock();
    +		try {
    +			if (this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) {
    +				timerMap.remove(key);
    --- End diff --
    
    This looks like it could take a very long time (until the timer triggers) until a timer
is truly removed when the remove happened while there was a snapshot ongoing? This could potentially
accumulate a lot of deleted timers.


> async checkpoints for timer service
> -----------------------------------
>
>                 Key: FLINK-9182
>                 URL: https://issues.apache.org/jira/browse/FLINK-9182
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: makeyang
>            Assignee: makeyang
>            Priority: Minor
>             Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint more and
more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable
and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager.
one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete version
beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered
in timerservice, it is created with stateTableVersion as its create version while delete version
is -1. each time when timer is deleted in timerservice, it is marked delete for giving it
a delete verison equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion
and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write
lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version
of this snapshot
>  ## shallow copy <String,HeapInternalTimerService> tuples
>  ## then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer
and timers. for timers which is not deleted(delete version is -1) and create version less
than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger
than or equals snapshot version, serialized it. otherwise, it will not be serialized by this
snapshot.
>  ## when everything is serialized, remove snapshot version in snapshotVersions, which
is still in another thread and this action is guarded by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete timers: each time
when timer is deleted in timerservice, it is marked delete for giving it a delete verison
equals to stateTableVersion without physically delete it from timerservice. after this, check
if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete
timer .the other place to delete is in snapshot timer's iterat: when timer's delete version
is less than min value of snapshotVersions, which means the timer is deleted and no running
snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each group used
to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its
hash key.
>  # related mail list thread
>  ## http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message