flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3339] Make ValueState.update(null) act as ValueState.clear()
Date Mon, 08 Feb 2016 12:02:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6d83c9d95 -> 7469c17cb


[FLINK-3339] Make ValueState.update(null) act as ValueState.clear()

This was causing problems with TypeSerializers that choke on null
values, especially in the Scala KeyedStream.*WithState() family of
functions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7469c17c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7469c17c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7469c17c

Branch: refs/heads/master
Commit: 7469c17cb9004873101622ca4cc19f95b8d79642
Parents: 6d83c9d
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Feb 5 14:46:14 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 8 13:02:04 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBValueState.java      |  4 +
 .../runtime/state/filesystem/FsValueState.java  |  5 ++
 .../runtime/state/memory/MemValueState.java     |  5 ++
 .../runtime/state/StateBackendTestBase.java     | 81 +++++++++++++++++++-
 4 files changed, 94 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7469c17c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 8767a86..f51e160 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -102,6 +102,10 @@ public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
 
 	@Override
 	public void update(V value) throws IOException {
+		if (value == null) {
+			clear();
+			return;
+		}
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/7469c17c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
index 1a53980..40b973e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsValueState.java
@@ -94,6 +94,11 @@ public class FsValueState<K, N, V>
 			throw new RuntimeException("No key available.");
 		}
 
+		if (value == null) {
+			clear();
+			return;
+		}
+
 		if (currentNSState == null) {
 			currentNSState = new HashMap<>();
 			state.put(currentNamespace, currentNSState);

http://git-wip-us.apache.org/repos/asf/flink/blob/7469c17c/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
index 8ce166a..45b4158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemValueState.java
@@ -69,6 +69,11 @@ public class MemValueState<K, N, V>
 			throw new RuntimeException("No key available.");
 		}
 
+		if (value == null) {
+			clear();
+			return;
+		}
+
 		if (currentNSState == null) {
 			currentNSState = new HashMap<>();
 			state.put(currentNamespace, currentNSState);

http://git-wip-us.apache.org/repos/asf/flink/blob/7469c17c/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 20a46a6..6083bd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import com.google.common.base.Joiner;
 
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListState;
@@ -31,7 +32,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.types.IntValue;
 
@@ -113,12 +116,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 
 			kv.dispose();
 
-//			 restore the first snapshot and validate it
+			// restore the first snapshot and validate it
 			KvState<Integer, Void, ValueState<String>, ValueStateDescriptor<String>,
B> restored1 = snapshot1.restoreState(
 					backend,
 					IntSerializer.INSTANCE,
 					this.getClass().getClassLoader(), 10);
 
+			snapshot1.discardState();
+
 			@SuppressWarnings("unchecked")
 			ValueState<String> restored1State = (ValueState<String>) restored1;
 
@@ -135,6 +140,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 					IntSerializer.INSTANCE,
 					this.getClass().getClassLoader(), 10);
 
+		snapshot2.discardState();
+
 			@SuppressWarnings("unchecked")
 			ValueState<String> restored2State = (ValueState<String>) restored2;
 
@@ -146,6 +153,70 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 			assertEquals("u3", restored2State.value());
 	}
 
+	/**
+	 * This test verifies that passing {@code null} to {@link ValueState#update(Object)} acts
+	 * the same as {@link ValueState#clear()}.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testValueStateNullUpdate() throws Exception {
+
+		// precondition: LongSerializer must fail on null value. this way the test would fail
+		// later if null values where actually stored in the state instead of acting as clear()
+		try {
+			LongSerializer.INSTANCE.serialize(null,
+				new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
+			fail("Should faill with NullPointerException");
+		} catch (NullPointerException e) {
+			// alrighty
+		}
+
+		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
+
+		ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE,
42L);
+		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		ValueState<Long> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE,
kvId);
+
+		@SuppressWarnings("unchecked")
+		KvState<Integer, Void, ValueState<Long>, ValueStateDescriptor<Long>, B>
kv =
+			(KvState<Integer, Void, ValueState<Long>, ValueStateDescriptor<Long>, B>)
state;
+
+		// some modifications to the state
+		kv.setCurrentKey(1);
+
+		// verify default value
+		assertEquals(42L, (long) state.value());
+		state.update(1L);
+		assertEquals(1L, (long) state.value());
+
+		kv.setCurrentKey(2);
+		assertEquals(42L, (long) state.value());
+
+		kv.setCurrentKey(1);
+		state.clear();
+		assertEquals(42L, (long) state.value());
+
+		state.update(17L);
+		assertEquals(17L, (long) state.value());
+
+		state.update(null);
+		assertEquals(42L, (long) state.value());
+
+		// draw a snapshot, this would fail with a NPE if update(null) would not act as clear()
+		KvStateSnapshot<Integer, Void, ValueState<Long>, ValueStateDescriptor<Long>,
B> snapshot1 =
+			kv.snapshot(682375462378L, 2);
+
+		kv.dispose();
+
+		// restore the snapshot
+		snapshot1.restoreState(
+			backend,
+			IntSerializer.INSTANCE,
+			this.getClass().getClassLoader(), 10);
+	}
+
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
 	public void testListState() {
@@ -202,6 +273,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 					IntSerializer.INSTANCE,
 					this.getClass().getClassLoader(), 10);
 
+			snapshot1.discardState();
+
 			@SuppressWarnings("unchecked")
 			ListState<String> restored1State = (ListState<String>) restored1;
 
@@ -218,6 +291,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 					IntSerializer.INSTANCE,
 					this.getClass().getClassLoader(), 20);
 
+			snapshot2.discardState();
+
 			@SuppressWarnings("unchecked")
 			ListState<String> restored2State = (ListState<String>) restored2;
 
@@ -299,6 +374,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 				IntSerializer.INSTANCE,
 				this.getClass().getClassLoader(), 10);
 
+			snapshot1.discardState();
+
 			@SuppressWarnings("unchecked")
 			ReducingState<String> restored1State = (ReducingState<String>) restored1;
 
@@ -315,6 +392,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
{
 				IntSerializer.INSTANCE,
 				this.getClass().getClassLoader(), 20);
 
+			snapshot2.discardState();
+
 			@SuppressWarnings("unchecked")
 			ReducingState<String> restored2State = (ReducingState<String>) restored2;
 


Mime
View raw message