flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9981) Tune performance of RocksDB implementation
Date Tue, 31 Jul 2018 13:47:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563688#comment-16563688
] 

ASF GitHub Bot commented on FLINK-9981:
---------------------------------------

azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB
implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206506427
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/MinMaxPriorityQueueOrderedSetCache.java
 ##########
 @@ -41,95 +38,86 @@
  *
  * @param <E> type of the contained elements.
  */
-public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.OrderedSetCache<E>
{
+public class MinMaxPriorityQueueOrderedSetCache<E extends HeapPriorityQueueElement>
+	implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
 
 	/** The tree is used to store cached elements. */
 	@Nonnull
-	private final ObjectAVLTreeSet<E> avlTree;
+	private final HeapMinMaxPriorityQueueSet<E> minMaxHeapSet;
 
 	/** The element comparator. */
 	@Nonnull
-	private final Comparator<E> elementComparator;
+	private final PriorityComparator<E> priorityComparator;
 
 	/** The maximum capacity of the cache. */
 	@Nonnegative
 	private final int capacity;
 
 	/**
-	 * Creates a new {@link TreeOrderedSetCache} with the given capacity and element comparator.
Capacity must be > 0.
-	 * @param elementComparator comparator for the cached elements.
+	 * Creates a new {@link MinMaxPriorityQueueOrderedSetCache} with the given capacity and
element comparator. Capacity must be > 0.
+	 * @param priorityComparator comparator for the cached elements.
 	 * @param capacity the capacity of the cache. Must be > 0.
 	 */
-	public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, @Nonnegative
int capacity) {
+	public MinMaxPriorityQueueOrderedSetCache(@Nonnull PriorityComparator<E> priorityComparator,
@Nonnegative int capacity) {
 		Preconditions.checkArgument(capacity > 0, "Cache capacity must be greater than 0.");
-		this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
-		this.elementComparator = elementComparator;
+		this.minMaxHeapSet = new HeapMinMaxPriorityQueueSet<>(priorityComparator, capacity);
+		this.priorityComparator = priorityComparator;
 		this.capacity = capacity;
 	}
 
 	@Override
 	public void add(@Nonnull E element) {
 		assert !isFull();
-		avlTree.add(element);
+		minMaxHeapSet.add(element);
 	}
 
 	@Override
 	public void remove(@Nonnull E element) {
-		avlTree.remove(element);
+		minMaxHeapSet.remove(element);
 	}
 
 	@Override
 	public boolean isFull() {
-		return avlTree.size() == capacity;
+		return minMaxHeapSet.size() == capacity;
 	}
 
 	@Override
 	public boolean isEmpty() {
-		return avlTree.isEmpty();
+		return minMaxHeapSet.isEmpty();
 	}
 
 	@Override
 	public boolean isInLowerBound(@Nonnull E toCheck) {
-		return avlTree.isEmpty() || elementComparator.compare(peekLast(), toCheck) > 0;
+		return minMaxHeapSet.isEmpty() || priorityComparator.comparePriority(peekLast(), toCheck)
>= 0;
 	}
 
 	@Nullable
 	@Override
 	public E removeFirst() {
-		if (avlTree.isEmpty()) {
-			return null;
-		}
-		final E first = avlTree.first();
-		avlTree.remove(first);
-		return first;
+		return minMaxHeapSet.pollFirst();
 	}
 
 	@Nullable
 	@Override
 	public E removeLast() {
-		if (avlTree.isEmpty()) {
-			return null;
-		}
-		final E last = avlTree.last();
-		avlTree.remove(last);
-		return last;
+		return minMaxHeapSet.pollLast();
 	}
 
 	@Nullable
 	@Override
 	public E peekFirst() {
-		return !avlTree.isEmpty() ? avlTree.first() : null;
+		return minMaxHeapSet.peekFirst();
 	}
 
 	@Nullable
 	@Override
 	public E peekLast() {
-		return !avlTree.isEmpty() ? avlTree.last() : null;
+		return minMaxHeapSet.peekLast();
 	}
 
 	@Nonnull
 	@Override
 	public CloseableIterator<E> orderedIterator() {
-		return CloseableIterator.adapterForIterator(avlTree.iterator());
+		return minMaxHeapSet.iterator();
 
 Review comment:
   I think the underlying iterator is not ordered any more and we do not rely on orderness
of this interface method anywhere. It can be renamed in this case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Tune performance of RocksDB implementation
> ------------------------------------------
>
>                 Key: FLINK-9981
>                 URL: https://issues.apache.org/jira/browse/FLINK-9981
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> General performance tuning/polishing for the RocksDB implementation. We can figure out
how caching/seeking can be improved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message