flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation
Date Wed, 01 Aug 2018 18:33:27 GMT
azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune performance of RocksDB
implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206966986
 
 

 ##########
 File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
 ##########
 @@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.OrderedByteArraySetCache.LEXICOGRAPHIC_BYTE_COMPARATOR;
+
+/**
+ * A priority queue with set semantics, implemented on top of RocksDB. This uses a {@link
TreeSet} to cache the bytes
+ * of up to the first n elements from RocksDB in memory to reduce interaction with RocksDB,
in particular seek
+ * operations. Cache uses a simple write-through policy.
+ *
+ * @param <E> the type of the contained elements in the queue.
+ */
+public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
+	implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+	/** Serialized empty value to insert into RocksDB. */
+	private static final byte[] DUMMY_BYTES = new byte[] {};
+
+	/** The RocksDB instance that serves as store. */
+	@Nonnull
+	private final RocksDB db;
+
+	/** Handle to the column family of the RocksDB instance in which the elements are stored.
*/
+	@Nonnull
+	private final ColumnFamilyHandle columnFamilyHandle;
+
+	/**
+	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized
objects must be
+	 * aligned with their logical order.
+	 */
+	@Nonnull
+	private final TypeSerializer<E> byteOrderProducingSerializer;
+
+	/** Wrapper to batch all writes to RocksDB. */
+	@Nonnull
+	private final RocksDBWriteBatchWrapper batchWrapper;
+
+	/** The key-group id in serialized form. */
+	@Nonnull
+	private final byte[] groupPrefixBytes;
+
+	/** Output stream that helps to serialize elements. */
+	@Nonnull
+	private final ByteArrayOutputStreamWithPos outputStream;
+
+	/** Output view that helps to serialize elements, must wrap the output stream. */
+	@Nonnull
+	private final DataOutputViewStreamWrapper outputView;
+
+	@Nonnull
+	private final ByteArrayInputStreamWithPos inputStream;
+
+	@Nonnull
+	private final DataInputViewStreamWrapper inputView;
+
+	/** In memory cache that holds a partial view on the head of the RocksDB content. */
+	@Nonnull
+	private final OrderedByteArraySetCache orderedCache;
+
+	/** This holds the key that we use to seek to the first element in RocksDB, to improve seek/iterator
performance. */
+	@Nonnull
+	private byte[] seekHint;
+
+	/** Cache for the head element in de-serialized form. */
+	@Nullable
+	private E peekCache;
+
+	/** This flag is true if there could be elements in the backend that are not in the cache
(false positives ok). */
+	private boolean storeOnlyElements;
+
+	/** Index for management as a {@link HeapPriorityQueueElement}. */
+	private int internalIndex;
+
+	RocksDBCachingPriorityQueueSet(
+		@Nonnegative int keyGroupId,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnull RocksDB db,
+		@Nonnull ColumnFamilyHandle columnFamilyHandle,
+		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
+		@Nonnull ByteArrayOutputStreamWithPos outputStream,
+		@Nonnull ByteArrayInputStreamWithPos inputStream,
+		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
+		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
+		this.db = db;
+		this.columnFamilyHandle = columnFamilyHandle;
+		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+		this.outputStream = outputStream;
+		this.inputStream = inputStream;
+		this.batchWrapper = batchWrapper;
+		this.storeOnlyElements = true;
+		this.outputView = new DataOutputViewStreamWrapper(outputStream);
+		this.inputView = new DataInputViewStreamWrapper(inputStream);
+		this.orderedCache = orderedByteArraySetCache;
+		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
+		this.seekHint = groupPrefixBytes;
+		this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+	}
+
+	@Nullable
+	@Override
+	public E peek() {
+
+		checkRefillCacheFromStore();
+
+		if (peekCache != null) {
+			return peekCache;
+		}
+
+		byte[] firstBytes = orderedCache.peekFirst();
+		if (firstBytes != null) {
+			peekCache = deserializeElement(firstBytes);
+			return peekCache;
+		} else {
+			return null;
+		}
+	}
+
+	@Nullable
+	@Override
+	public E poll() {
+
+		checkRefillCacheFromStore();
+
+		final byte[] firstBytes = orderedCache.pollFirst();
+
+		if (firstBytes == null) {
+			return null;
+		}
+
+		// write-through sync
+		removeFromRocksDB(firstBytes);
+
+		if (orderedCache.isEmpty()) {
+			seekHint = firstBytes;
+		}
+
+		if (peekCache != null) {
+			E fromCache = peekCache;
+			peekCache = null;
+			return fromCache;
+		} else {
+			return deserializeElement(firstBytes);
+		}
+	}
+
+	@Override
+	public boolean add(@Nonnull E toAdd) {
+
+		checkRefillCacheFromStore();
+
+		final byte[] toAddBytes = serializeElement(toAdd);
+
+		final boolean cacheFull = orderedCache.isFull();
+
+		if ((!cacheFull && !storeOnlyElements) ||
+			LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
+
+			if (cacheFull) {
+				// we drop the element with lowest priority from the cache
+				orderedCache.pollLast();
+				// the dropped element is now only in the store
+				storeOnlyElements = true;
+			}
+
+			if (orderedCache.add(toAddBytes)) {
+				// write-through sync
+				addToRocksDB(toAddBytes);
+				if (toAddBytes == orderedCache.peekFirst()) {
+					peekCache = null;
+					return true;
+				}
+			}
+		} else {
+			// we only added to the store
+			addToRocksDB(toAddBytes);
+			storeOnlyElements = true;
+		}
+		return false;
+	}
+
+	@Override
+	public boolean remove(@Nonnull E toRemove) {
+
+		checkRefillCacheFromStore();
+
+		final byte[] oldHead = orderedCache.peekFirst();
+
+		if (oldHead == null) {
+			return false;
+		}
+
+		final byte[] toRemoveBytes = serializeElement(toRemove);
+
+		// write-through sync
+		removeFromRocksDB(toRemoveBytes);
+		orderedCache.remove(toRemoveBytes);
+
+		if (orderedCache.isEmpty()) {
+			seekHint = toRemoveBytes;
+			peekCache = null;
+			return true;
+		}
+
+		if (oldHead != orderedCache.peekFirst()) {
+			peekCache = null;
+			return true;
+		}
+
+		return false;
+	}
+
+	@Override
+	public void addAll(@Nullable Collection<? extends E> toAdd) {
+
+		if (toAdd == null) {
+			return;
+		}
+
+		for (E element : toAdd) {
+			add(element);
+		}
+	}
+
+	@Override
+	public boolean isEmpty() {
+		checkRefillCacheFromStore();
+		return orderedCache.isEmpty();
+	}
+
+	@Nonnull
+	@Override
+	public CloseableIterator<E> iterator() {
+		return new DeserializingIteratorWrapper(orderedBytesIterator());
+	}
+
+	/**
+	 * This implementation comes at a relatively high cost per invocation. It should not be
called repeatedly when it is
+	 * clear that the value did not change. Currently this is only truly used to realize certain
higher-level tests.
+	 */
+	@Override
+	public int size() {
+
+		if (storeOnlyElements) {
+			int count = 0;
+			try (final RocksBytesIterator iterator = orderedBytesIterator()) {
+				while (iterator.hasNext()) {
+					iterator.next();
+					++count;
+				}
+			}
+			return count;
+		} else {
+			return orderedCache.size();
+		}
+	}
+
+	@Override
+	public int getInternalIndex() {
+		return internalIndex;
+	}
+
+	@Override
+	public void setInternalIndex(int newIndex) {
+		this.internalIndex = newIndex;
+	}
+
+	@Nonnull
+	private RocksBytesIterator orderedBytesIterator() {
+		flushWriteBatch();
+		return new RocksBytesIterator(
+			new RocksIteratorWrapper(
+				db.newIterator(columnFamilyHandle)));
+	}
+
+	/**
+	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
+	 */
+	private void flushWriteBatch() {
+		try {
+			batchWrapper.flush();
+		} catch (RocksDBException e) {
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private void addToRocksDB(@Nonnull byte[] toAddBytes) {
+		try {
+			batchWrapper.put(columnFamilyHandle, toAddBytes, DUMMY_BYTES);
+		} catch (RocksDBException e) {
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private void removeFromRocksDB(@Nonnull byte[] toRemoveBytes) {
+		try {
+			batchWrapper.remove(columnFamilyHandle, toRemoveBytes);
+		} catch (RocksDBException e) {
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private void checkRefillCacheFromStore() {
+		if (storeOnlyElements && orderedCache.isEmpty()) {
+			try (final RocksBytesIterator iterator = orderedBytesIterator()) {
+				orderedCache.bulkLoadFromOrderedIterator(iterator);
+				storeOnlyElements = iterator.hasNext();
+			} catch (Exception e) {
+				throw new FlinkRuntimeException("Exception while refilling store from iterator.", e);
+			}
+		}
+	}
+
+	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+		for (int i = 0; i < prefixBytes.length; ++i) {
+			if (bytes[i] != prefixBytes[i]) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Nonnull
+	private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
+
+		outputStream.reset();
+
+		try {
+			RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
+		}
+
+		return outputStream.toByteArray();
+	}
+
+	@Nonnull
+	private byte[] serializeElement(@Nonnull E element) {
+		try {
+			outputStream.reset();
+			outputView.write(groupPrefixBytes);
+			byteOrderProducingSerializer.serialize(element, outputView);
+			return outputStream.toByteArray();
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Error while serializing the element.", e);
+		}
+	}
+
+	@Nonnull
+	private E deserializeElement(@Nonnull byte[] bytes) {
+		try {
+			inputStream.setBuffer(bytes, 0, bytes.length);
+			inputView.skipBytes(groupPrefixBytes.length);
 
 Review comment:
   Can the first 2 lines be reduced to:
   inputStream.setBuffer(bytes, **groupPrefixBytes.length**, bytes.length);?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message