flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9491) Implement timer data structure based on RocksDB
Date Tue, 03 Jul 2018 13:58:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531451#comment-16531451
] 

ASF GitHub Bot commented on FLINK-9491:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199816750
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION
= TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    --- End diff --
    
    👍 


> Implement timer data structure based on RocksDB
> -----------------------------------------------
>
>                 Key: FLINK-9491
>                 URL: https://issues.apache.org/jira/browse/FLINK-9491
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> We can now implement timer state that is stored in RocksDB for users that run the {{RocksDBKeyedStateBackend}}.
As explained in the design document (https://docs.google.com/document/d/1XbhJRbig5c5Ftd77d0mKND1bePyTC26Pz04EvxdA7Jc/edit#heading=h.17v0k3363r6q)
this should also give us asynchronous and incremental snapshots for timer state that is larger
than main memory.
> We need to think about a way in which to user can select either to run timers on RocksDB
or on the heap when using the {{RocksDBKeyedStateBackend}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message