flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-5530] [queryable state] Fix race condition in AbstractRocksDBState#getSerializedValue
Date Sun, 22 Jan 2017 10:45:09 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4e050b8cb -> d16552dbe


[FLINK-5530] [queryable state] Fix race condition in AbstractRocksDBState#getSerializedValue

AbstractRocksDBState#getSerializedValue() uses the same key serialisation
stream as the ordinary state access methods but is called in parallel during
state queries thus violating the assumption of only one thread accessing it.

This may lead to either wrong results in queries or corrupt data while queries
are executed.

This closes #3143.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d16552db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d16552db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d16552db

Branch: refs/heads/master
Commit: d16552dbefccf8ab7e0e24f8940d555b34a02732
Parents: 4e050b8
Author: Nico Kruber <nico@data-artisans.com>
Authored: Tue Jan 17 17:38:29 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Sun Jan 22 11:44:57 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  70 ++++++++---
 .../runtime/state/StateBackendTestBase.java     | 117 +++++++++++++++++++
 .../flink/core/testutils/CheckedThread.java     |  19 +++
 3 files changed, 187 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d16552db/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 9da33ef..6785f17 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -73,7 +73,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 	private final WriteOptions writeOptions;
 
 	protected final ByteArrayOutputStreamWithPos keySerializationStream;
-	protected final DataOutputView keySerializationDateDataOutputView;
+	protected final DataOutputView keySerializationDataOutputView;
 
 	private final boolean ambiguousKeyPossible;
 
@@ -97,7 +97,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor");
 
 		this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
-		this.keySerializationDateDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
+		this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
 		this.ambiguousKeyPossible = (backend.getKeySerializer().getLength() < 0)
 				&& (namespaceSerializer.getLength() < 0);
 	}
@@ -132,55 +132,87 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 				namespaceSerializer);
 
 		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
-		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
-		return backend.db.get(columnFamily, keySerializationStream.toByteArray());
 
+		// we cannot reuse the keySerializationStream member since this method
+		// is called concurrently to the other ones and it may thus contain garbage
+		ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
+		DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+			tmpKeySerializationStream, tmpKeySerializationDateDataOutputView);
+
+		return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
 	}
 
 	protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
-		writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(),
currentNamespace);
+		writeKeyWithGroupAndNamespace(
+			backend.getCurrentKeyGroupIndex(),
+			backend.getCurrentKey(),
+			currentNamespace,
+			keySerializationStream,
+			keySerializationDataOutputView);
 	}
 
-	protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException
{
+	protected void writeKeyWithGroupAndNamespace(
+			int keyGroup, K key, N namespace,
+			ByteArrayOutputStreamWithPos keySerializationStream,
+			DataOutputView keySerializationDataOutputView) throws IOException {
+
 		keySerializationStream.reset();
-		writeKeyGroup(keyGroup);
-		writeKey(key);
-		writeNameSpace(namespace);
+		writeKeyGroup(keyGroup, keySerializationDataOutputView);
+		writeKey(key, keySerializationStream, keySerializationDataOutputView);
+		writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView);
 	}
 
-	private void writeKeyGroup(int keyGroup) throws IOException {
+	private void writeKeyGroup(
+			int keyGroup,
+			DataOutputView keySerializationDateDataOutputView) throws IOException {
 		for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
 			keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3));
 		}
 	}
 
-	private void writeKey(K key) throws IOException {
+	private void writeKey(
+			K key,
+			ByteArrayOutputStreamWithPos keySerializationStream,
+			DataOutputView keySerializationDataOutputView) throws IOException {
 		//write key
 		int beforeWrite = keySerializationStream.getPosition();
-		backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);
+		backend.getKeySerializer().serialize(key, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
 			//write size of key
-			writeLengthFrom(beforeWrite);
+			writeLengthFrom(beforeWrite, keySerializationStream,
+				keySerializationDataOutputView);
 		}
 	}
 
-	private void writeNameSpace(N namespace) throws IOException {
+	private void writeNameSpace(
+			N namespace,
+			ByteArrayOutputStreamWithPos keySerializationStream,
+			DataOutputView keySerializationDataOutputView) throws IOException {
 		int beforeWrite = keySerializationStream.getPosition();
-		namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);
+		namespaceSerializer.serialize(namespace, keySerializationDataOutputView);
 
 		if (ambiguousKeyPossible) {
 			//write length of namespace
-			writeLengthFrom(beforeWrite);
+			writeLengthFrom(beforeWrite, keySerializationStream,
+				keySerializationDataOutputView);
 		}
 	}
 
-	private void writeLengthFrom(int fromPosition) throws IOException {
+	private static void writeLengthFrom(
+			int fromPosition,
+			ByteArrayOutputStreamWithPos keySerializationStream,
+			DataOutputView keySerializationDateDataOutputView) throws IOException {
 		int length = keySerializationStream.getPosition() - fromPosition;
-		writeVariableIntBytes(length);
+		writeVariableIntBytes(length, keySerializationDateDataOutputView);
 	}
 
-	private void writeVariableIntBytes(int value) throws IOException {
+	private static void writeVariableIntBytes(
+			int value,
+			DataOutputView keySerializationDateDataOutputView)
+			throws IOException {
 		do {
 			keySerializationDateDataOutputView.writeByte(value);
 			value >>>= 8;

http://git-wip-us.apache.org/repos/asf/flink/blob/d16552db/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 641e14b..38e04aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -54,6 +55,8 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RunnableFuture;
 
@@ -242,6 +245,120 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 		backend.dispose();
 	}
 
+	/**
+	 * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])}
+	 * accessing the state concurrently. They should not get in the way of each
+	 * other.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testValueStateRace() throws Exception {
+		final AbstractKeyedStateBackend<Integer> backend =
+			createKeyedBackend(IntSerializer.INSTANCE);
+		final Integer namespace = Integer.valueOf(1);
+
+		final ValueStateDescriptor<String> kvId =
+			new ValueStateDescriptor<>("id", String.class);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		final TypeSerializer<Integer> namespaceSerializer =
+			IntSerializer.INSTANCE;
+		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
+		final ValueState<String> state = backend
+			.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
+
+		@SuppressWarnings("unchecked")
+		final KvState<Integer> kvState = (KvState<Integer>) state;
+
+		/**
+		 * 1) Test that ValueState#value() before and after
+		 * KvState#getSerializedValue(byte[]) return the same value.
+		 */
+
+		// set some key and namespace
+		final int key1 = 1;
+		backend.setCurrentKey(key1);
+		kvState.setCurrentNamespace(2);
+		state.update("2");
+		assertEquals("2", state.value());
+
+		// query another key and namespace
+		assertNull(getSerializedValue(kvState, 3, keySerializer,
+			namespace, IntSerializer.INSTANCE,
+			valueSerializer));
+
+		// the state should not have changed!
+		assertEquals("2", state.value());
+
+		// re-set values
+		kvState.setCurrentNamespace(namespace);
+
+		/**
+		 * 2) Test two threads concurrently using ValueState#value() and
+		 * KvState#getSerializedValue(byte[]).
+		 */
+
+		// some modifications to the state
+		final int key2 = 10;
+		backend.setCurrentKey(key2);
+		assertNull(state.value());
+		assertNull(getSerializedValue(kvState, key2, keySerializer,
+			namespace, namespaceSerializer, valueSerializer));
+		state.update("1");
+
+		final CheckedThread getter = new CheckedThread("State getter") {
+			@Override
+			public void go() throws Exception {
+				while (!isInterrupted()) {
+					assertEquals("1", state.value());
+				}
+			}
+		};
+
+		final CheckedThread serializedGetter = new CheckedThread("Serialized state getter") {
+			@Override
+			public void go() throws Exception {
+				while(!isInterrupted() && getter.isAlive()) {
+					final String serializedValue =
+						getSerializedValue(kvState, key2, keySerializer,
+							namespace, namespaceSerializer,
+							valueSerializer);
+					assertEquals("1", serializedValue);
+				}
+			}
+		};
+
+		getter.start();
+		serializedGetter.start();
+
+		// run both threads for max 100ms
+		Timer t = new Timer("stopper");
+		t.schedule(new TimerTask() {
+			@Override
+			public void run() {
+				getter.interrupt();
+				serializedGetter.interrupt();
+				this.cancel();
+			}
+		}, 100);
+
+		// wait for both threads to finish
+		try {
+			// serializedGetter will finish if its assertion fails or if
+			// getter is not alive any more
+			serializedGetter.sync();
+			// if serializedGetter crashed, getter will not know -> interrupt just in case
+			getter.interrupt();
+			getter.sync();
+			t.cancel(); // if not executed yet
+		} finally {
+			// clean up
+			backend.dispose();
+		}
+	}
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testMultipleValueStates() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d16552db/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
index 1dad8c8..5de6a87 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -37,6 +37,25 @@ public abstract class CheckedThread extends Thread {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Unnamed checked thread.
+	 */
+	public CheckedThread() {
+		super();
+	}
+
+	/**
+	 * Checked thread with a name.
+	 *
+	 * @param name
+	 * 		the name of the new thread
+	 *
+	 * @see Thread#Thread(String)
+	 */
+	public CheckedThread(final String name) {
+		super(name);
+	}
+
+	/**
 	 * This method needs to be overwritten to contain the main work logic.
 	 * It takes the role of {@link Thread#run()}, but should propagate exceptions.
 	 *


Mime
View raw message