flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject flink git commit: [FLINK-7683][state backends] Introduce iterator for keys in KeyedStateBackend. This closes #4722.
Date Thu, 28 Sep 2017 14:37:38 GMT
Repository: flink
Updated Branches:
  refs/heads/master 685ca8af8 -> 3ff059299


[FLINK-7683][state backends] Introduce iterator for keys in KeyedStateBackend.
This closes #4722.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ff05929
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ff05929
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ff05929

Branch: refs/heads/master
Commit: 3ff059299b7ad27d005fb4e0a89689c72eeb5c0e
Parents: 685ca8a
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Thu Sep 14 12:39:30 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Thu Sep 28 16:36:33 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 61 ++++++++++++++++++++
 .../flink/runtime/state/KeyedStateBackend.java  | 10 ++++
 .../state/heap/CopyOnWriteStateTable.java       | 10 ++++
 .../state/heap/HeapKeyedStateBackend.java       | 10 ++++
 .../state/heap/NestedMapsStateTable.java        | 11 ++++
 .../flink/runtime/state/heap/StateTable.java    |  4 ++
 .../runtime/state/StateBackendTestBase.java     | 38 +++++++++++-
 .../state/heap/HeapStateBackendTestBase.java    | 21 ++++---
 8 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a1500c7..f6ed87d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -100,8 +102,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -111,6 +115,8 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
  * A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will
serialize state to
@@ -252,6 +258,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier,
this.backendUID);
 	}
 
+	@Override
+	public <N> Stream<K> getKeys(String state, N namespace) {
+		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
+		if (columnInfo == null) {
+			return Stream.empty();
+		}
+
+		RocksIterator iterator = db.newIterator(columnInfo.f0);
+		iterator.seekToFirst();
+
+		Iterable<K> iterable = () -> new RocksIteratorWrapper<>(iterator, state,
keySerializer, keyGroupPrefixBytes);
+		Stream<K> targetStream = StreamSupport.stream(iterable.spliterator(), false);
+		return targetStream.onClose(iterator::close);
+	}
+
 	/**
 	 * Should only be called by one thread, and only after all accesses to the DB happened.
 	 */
@@ -1978,4 +1999,44 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 	public boolean supportsAsynchronousSnapshots() {
 		return true;
 	}
+
+	private static class RocksIteratorWrapper<K> implements Iterator<K> {
+		private final RocksIterator iterator;
+		private final String state;
+		private final TypeSerializer<K> keySerializer;
+		private final int keyGroupPrefixBytes;
+
+		public RocksIteratorWrapper(
+				RocksIterator iterator,
+				String state,
+				TypeSerializer<K> keySerializer,
+				int keyGroupPrefixBytes) {
+			this.iterator = Preconditions.checkNotNull(iterator);
+			this.state = Preconditions.checkNotNull(state);
+			this.keySerializer = Preconditions.checkNotNull(keySerializer);
+			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.isValid();
+		}
+
+		@Override
+		public K next() {
+			if (!hasNext()) {
+				throw new NoSuchElementException("Failed to access state [" + state + "]");
+			}
+			try {
+				byte[] key = iterator.key();
+					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
+					new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
+				K value = keySerializer.deserialize(dataInput);
+				iterator.next();
+				return value;
+			} catch (IOException e) {
+				throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 09e27e7..c74cfcf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
+import java.util.stream.Stream;
+
 /**
  * A keyed state backend provides methods for managing keyed state.
  *
@@ -37,6 +39,14 @@ public interface KeyedStateBackend<K> extends InternalKeyContext<K>
{
 	void setCurrentKey(K newKey);
 
 	/**
+	 * @return A stream of all keys for the given state and namespace. Modifications to the
state during iterating
+	 * 		   over it keys are not supported.
+	 * @param state State variable for which existing keys will be returned.
+	 * @param namespace Namespace for which existing keys will be returned.
+	 */
+	<N> Stream<K> getKeys(String state, N namespace);
+
+	/**
 	 * Creates or retrieves a keyed state backed by this state backend.
 	 *
 	 * @param namespaceSerializer The serializer used for the namespace type of the state

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 7b61da1..c5f2937 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,8 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.TreeSet;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /**
  * Implementation of Flink's in-memory state tables with copy-on-write support. This map
does not support null values
@@ -287,6 +290,13 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K,
N, S> implemen
 	}
 
 	@Override
+	public Stream<K> getKeys(N namespace) {
+		Iterable<StateEntry<K, N, S>> iterable = () -> iterator();
+		return StreamSupport.stream(iterable.spliterator(), false)
+			.map(entry -> entry.getKey());
+	}
+
+	@Override
 	public void put(K key, int keyGroup, N namespace, S state) {
 		put(key, namespace, state);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bf92b34..28c623f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -78,6 +78,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
 
 /**
  * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and will serialize
state to
@@ -212,6 +213,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		return stateTable;
 	}
 
+	@Override
+	public <N> Stream<K> getKeys(String state, N namespace) {
+		if (!stateTables.containsKey(state)) {
+			return Stream.empty();
+		}
+		StateTable<K, N, ?> table = (StateTable<K, N, ?>) stateTables.get(state);
+		return table.getKeys(namespace);
+	}
+
 	private boolean hasRegisteredState() {
 		return !stateTables.isEmpty();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 02d0cd4..870ecbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -27,8 +27,11 @@ import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 /**
  * This implementation of {@link StateTable} uses nested {@link HashMap} objects. It is also
maintaining a partitioning
@@ -166,6 +169,14 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K,
N, S> {
 		return get(key, keyGroup, namespace);
 	}
 
+	@Override
+	public Stream<K> getKeys(N namespace) {
+		return Arrays.stream(state)
+			.filter(namespaces -> namespaces != null)
+			.map(namespaces -> namespaces.getOrDefault(namespace, Collections.emptyMap()))
+			.flatMap(namespaceSate -> namespaceSate.keySet().stream());
+	}
+
 	// ------------------------------------------------------------------------
 
 	private boolean containsKey(K key, int keyGroupIndex, N namespace) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index c1cdcc3..8c07b25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
+import java.util.stream.Stream;
+
 /**
  * Base class for state tables. Accesses to state are typically scoped by the currently active
key, as provided
  * through the {@link InternalKeyContext}.
@@ -158,6 +160,8 @@ public abstract class StateTable<K, N, S> {
 	 */
 	public abstract S get(K key, N namespace);
 
+	public abstract Stream<K> getKeys(N namespace);
+
 	// Meta data setter / getter and toString -----------------------------------------------------
 
 	public TypeSerializer<S> getStateSerializer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f6f73f2..7dd652c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -60,13 +60,14 @@ import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -83,12 +84,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.PrimitiveIterator;
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
@@ -190,6 +193,39 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 	}
 
 	@Test
+	public void testGetKeys() throws Exception {
+		final int elementsToTest = 1000;
+		String fieldName = "get-keys-test";
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		try {
+			ValueState<Integer> keyedState = backend.getOrCreateKeyedState(
+				VoidNamespaceSerializer.INSTANCE,
+				new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE));
+			((InternalValueState<VoidNamespace, Integer>) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			for (int key = 0; key < elementsToTest; key++) {
+				backend.setCurrentKey(key);
+				keyedState.update(key * 2);
+			}
+
+			try (Stream<Integer> keysStream = backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted())
{
+				PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
+
+				for (int expectedKey = 0; expectedKey < elementsToTest; expectedKey++) {
+					assertTrue(actualIterator.hasNext());
+					assertEquals(expectedKey, actualIterator.nextInt());
+				}
+
+				assertFalse(actualIterator.hasNext());
+			}
+		}
+		finally {
+			IOUtils.closeQuietly(backend);
+			backend.dispose();
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff05929/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index e6adef8..2136304 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -42,13 +43,17 @@ public abstract class HeapStateBackendTestBase {
 	public boolean async;
 
 	public HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+		return createKeyedBackend(StringSerializer.INSTANCE);
+	}
+
+	public <K> HeapKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K>
keySerializer) throws Exception {
 		return new HeapKeyedStateBackend<>(
-				mock(TaskKvStateRegistry.class),
-				StringSerializer.INSTANCE,
-				HeapReducingStateTest.class.getClassLoader(),
-				16,
-				new KeyGroupRange(0, 15),
-				async,
-				new ExecutionConfig());
+			mock(TaskKvStateRegistry.class),
+			keySerializer,
+			HeapReducingStateTest.class.getClassLoader(),
+			16,
+			new KeyGroupRange(0, 15),
+			async,
+			new ExecutionConfig());
 	}
-}
\ No newline at end of file
+}


Mime
View raw message