flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9486) Introduce TimerState in keyed state backend
Date Mon, 09 Jul 2018 15:28:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16537068#comment-16537068
] 

ASF GitHub Bot commented on FLINK-9486:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201017209
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -2579,4 +2604,85 @@ public static RocksIteratorWrapper getRocksIterator(
     		ReadOptions readOptions) {
     		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
     	}
    +
    +	/**
    +	 * Encapsulates the logic and resources in connection with creating priority queue state
structures.
    +	 */
    +	class RocksDBPriorityQueueFactory implements PriorityQueueSetFactory, AutoCloseable
{
    +
    +		/** Default cache size per key-group. */
    +		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
    +
    +		/** A shared buffer to serialize elements for the priority queue. */
    +		@Nonnull
    +		private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
    +
    +		/** A shared adapter wrapper around elementSerializationOutStream to become a {@link
DataOutputView}. */
    +		@Nonnull
    +		private final DataOutputViewStreamWrapper elementSerializationOutView;
    +
    +		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues.
*/
    +		@Nonnull
    +		private final RocksDBWriteBatchWrapper writeBatchWrapper;
    +
    +		/** Map to track all column families created to back priority queues. */
    +		@Nonnull
    +		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
    +
    +		RocksDBPriorityQueueFactory() {
    +			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
    +			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
    +			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
    +			this.priorityQueueColumnFamilies = new HashMap<>();
    +		}
    +
    +		@Override
    +		public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
create(
    +			@Nonnull String stateName,
    +			@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +			@Nonnull Comparator<T> elementComparator,
    +			@Nonnull KeyExtractorFunction<T> keyExtractor) {
    +
    +			final ColumnFamilyHandle columnFamilyHandle =
    +				priorityQueueColumnFamilies.computeIfAbsent(stateName, RocksDBKeyedStateBackend.this::createColumnFamily);
    +
    +			return new KeyGroupPartitionedPriorityQueue<>(
    +				keyExtractor,
    +				elementComparator,
    +				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>()
{
    +					@Nonnull
    +					@Override
    +					public CachingInternalPriorityQueueSet<T> create(
    +						int keyGroupId,
    +						int numKeyGroups,
    +						@Nonnull Comparator<T> elementComparator) {
    --- End diff --
    
    Indentation


> Introduce TimerState in keyed state backend
> -------------------------------------------
>
>                 Key: FLINK-9486
>                 URL: https://issues.apache.org/jira/browse/FLINK-9486
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> This is the first implementation subtask.
> Goal of this PR is to introduce a timer state that is registered with the keyed state
backend, similar to other forms of keyed state.
> For the {{HeapKeyedStateBackend}}, this state lives on the same level as the {{StateTable}}
that hold other forms of keyed state, and the implementation is basically backed by {{InternalTimerHeap}}.
> For {{RocksDBKeyedStateBackend}}, in this first step, we also introduce this state, outside
of RocksDB and based upon {{InternalTimerHeap}}. This is an intermediate state, and we will
later also implement the alternative to store the timers inside a column families in RocksDB.
However, by taking this step, we could also still offer the option to have RocksDB state with
heap-based timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message