flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB
Date Fri, 17 Mar 2017 15:46:44 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930175#comment-15930175
] 

ASF GitHub Bot commented on FLINK-5544:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3359#discussion_r106670661
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
---
    @@ -18,43 +18,306 @@
     package org.apache.flink.streaming.api.operators;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.streaming.runtime.tasks.EventTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Interface for working with time and timers.
      *
      * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
      * that allows to specify a key and a namespace to which timers should be scoped.
      *
    + * All d
    + * 
    + * @param <K> Type of the keys in the stream
      * @param <N> Type of the namespace to which timers are scoped.
      */
     @Internal
    -public interface InternalTimerService<N> {
    +public abstract class InternalTimerService<K, N> implements ProcessingTimeCallback,
EventTimeCallback {
    +
    +	protected final ProcessingTimeService processingTimeService;
    +
    +	protected final KeyContext keyContext;
    +
    +	protected final int totalKeyGroups;
    +
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The one and only Future (if any) registered to execute the
    +	 * next {@link Triggerable} action, when its (processing) time arrives.
    +	 */
    +	protected ScheduledFuture<?> nextTimer;
    +
    +	/**
    +	 * The local event time, as denoted by the last received
    +	 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermark}.
    +	 */
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	// Variables to be set when the service is started.
    +
    +	protected TypeSerializer<K> keySerializer;
    +
    +	protected TypeSerializer<N> namespaceSerializer;
    +
    +	private InternalTimer.TimerSerializer<K, N> timerSerializer;
    +
    +	protected Triggerable<K, N> triggerTarget;
    +
    +	private volatile boolean isInitialized;
    +
    +	public InternalTimerService(
    +			int totalKeyGroups, 
    +			KeyGroupRange keyGroupRange, 
    +			KeyContext keyContext, 
    +			ProcessingTimeService processingTimeService) {
    +		
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.keyGroupRange = checkNotNull(keyGroupRange);
    +		this.keyContext = checkNotNull(keyContext);
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +	}
     
     	/** Returns the current processing time. */
    -	long currentProcessingTime();
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
     
     	/** Returns the current event-time watermark. */
    -	long currentWatermark();
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerProcessingTimeTimer(N namespace, long time);
    +	abstract public void registerProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteProcessingTimeTimer(N namespace, long time);
    +	abstract public void deleteProcessingTimeTimer(N namespace, long time);
     
     	/**
     	 * Registers a timer to be fired when processing time passes the given time. The namespace
     	 * you pass here will be provided when the timer fires.
     	 */
    -	void registerEventTimeTimer(N namespace, long time);
    +	abstract public void registerEventTimeTimer(N namespace, long time);
     
     	/**
     	 * Deletes the timer for the given key and namespace.
     	 */
    -	void deleteEventTimeTimer(N namespace, long time);
    +	abstract public void deleteEventTimeTimer(N namespace, long time);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getEventTimeTimersForKeyGroup(int
keyGroup);
    +
    +	/**
    +	 * Returns the timers for the given key group.
    +	 */
    +	abstract public Set<InternalTimer<K, N>> getProcessingTimeTimersForKeyGroup(int
keyGroup);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreEventTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K,
N>> timers);
    +
    +	/**
    +	 * Restores the timers for the given key group.
    +	 */
    +	abstract public void restoreProcessingTimeTimersForKeyGroup(int keyGroup, Iterable<InternalTimer<K,
N>> timers);
    +
    +	/**
    +	 * Starts the execution of the timer service
    +	 */
    +	abstract public void start();
    +
    +	/**
    +	 * Closes the timer service.
    +	 */
    +	abstract public void close();
    +	
    +	public void advanceWatermark(long watermark) throws Exception {
    +		if (watermark < currentWatermark) {
    +			throw new IllegalStateException("The watermark is late.");
    +		}
    +		
    +		currentWatermark = watermark;
    +		
    +		onEventTime(watermark);
    +	}
    +
    +	/**
    +	 * Snapshots the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
    +	 * @param stream the stream to write to.
    +	 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
    +	 */
    +	public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx)
throws Exception {
    +		InstantiationUtil.serializeObject(stream, keySerializer);
    +		InstantiationUtil.serializeObject(stream, namespaceSerializer);
    +
    +		// write the event time timers
    +		Collection<InternalTimer<K, N>> eventTimers = getEventTimeTimersForKeyGroup(keyGroupIdx);
    --- End diff --
    
    We could refactor out a method that writes `Collection<InternalTimer<K, N>>`
and call it twice to de-duplicate code.


> Implement Internal Timer Service in RocksDB
> -------------------------------------------
>
>                 Key: FLINK-5544
>                 URL: https://issues.apache.org/jira/browse/FLINK-5544
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> Now the only implementation of internal timer service is HeapInternalTimerService which
stores all timers in memory. In the cases where the number of keys is very large, the timer
service will cost too much memory. A implementation which stores timers in RocksDB seems good
to deal with these cases.
> It might be a little challenging to implement a RocksDB timer service because the timers
are accessed in different ways. When timers are triggered, we need to access timers in the
order of timestamp. But when performing checkpoints, we must have a method to obtain all timers
of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of merge sorting.
We can store timers in RocksDB with the format {{KEY_GROUP#TIMER#KEY}}. In this way, the timers
under a key group are put together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key group to
get the next timer to trigger. When a key group's first timer is updated, we can efficiently
update the heap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message