flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9423) Implement efficient deletes for heap based timer service
Date Tue, 29 May 2018 14:42:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493599#comment-16493599
] 

ASF GitHub Bot commented on FLINK-9423:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6062#discussion_r191413234
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
    @@ -301,114 +259,39 @@ public void advanceWatermark(long time) throws Exception {
     				keySerializer.snapshotConfiguration(),
     				namespaceSerializer,
     				namespaceSerializer.snapshotConfiguration(),
    -				getEventTimeTimerSetForKeyGroup(keyGroupIdx),
    -				getProcessingTimeTimerSetForKeyGroup(keyGroupIdx));
    +				eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
    +				processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
     	}
     
     	/**
     	 * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
     	 *
    -	 * @param restoredTimersSnapshot the restored snapshot containing the key-group's timers,
    +	 * @param restoredSnapshot the restored snapshot containing the key-group's timers,
     	 *                       and the serializers that were used to write them
     	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
     	 */
     	@SuppressWarnings("unchecked")
    -	public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredTimersSnapshot,
int keyGroupIdx) throws IOException {
    -		this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredTimersSnapshot;
    +	public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot,
int keyGroupIdx) {
    +		this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;
     
    -		if ((this.keyDeserializer != null && !this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer()))
||
    -			(this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer())))
{
    +		if ((this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer()))
||
    --- End diff --
    
    This check could be factored out into a method with a meaningful and easy to understand
name, e.g. `checkSerializerCompatibility`.


> Implement efficient deletes for heap based timer service
> --------------------------------------------------------
>
>                 Key: FLINK-9423
>                 URL: https://issues.apache.org/jira/browse/FLINK-9423
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The current data structures in the `HeapInternalTimerService` are not able to support
efficient timer deletes, the complexity is currently O\(n\), where n is the number of registered
timers.
>  
> We can keep track of timer's positions in the priority queue and (in combination with
the already existing set/map) have a more efficient algorithm for deletes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message