flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-5155] Deprecate ValueStateDescriptor constructors with default value
Date Fri, 13 Jan 2017 10:38:28 GMT
Repository: flink
Updated Branches:
  refs/heads/master 51a357351 -> 7a2d3bea9


[FLINK-5155] Deprecate ValueStateDescriptor constructors with default value


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a2d3bea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a2d3bea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a2d3bea

Branch: refs/heads/master
Commit: 7a2d3bea96d262de5cb963003b92833b27346af0
Parents: 51a3573
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Jan 11 12:14:13 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Jan 13 11:30:57 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  6 +--
 .../flink/api/common/state/ValueState.java      | 10 ++--
 .../api/common/state/ValueStateDescriptor.java  | 50 +++++++++++++++++++-
 .../AbstractKeyedCEPPatternOperator.java        |  6 +--
 .../runtime/query/QueryableStateClientTest.java |  2 +-
 .../runtime/query/netty/KvStateClientTest.java  |  2 +-
 .../query/netty/KvStateServerHandlerTest.java   | 10 ++--
 .../runtime/query/netty/KvStateServerTest.java  |  2 +-
 .../runtime/state/StateBackendTestBase.java     | 15 +++---
 .../streaming/api/datastream/KeyedStream.java   |  3 +-
 .../api/operators/StreamGroupedFold.java        |  2 +-
 .../api/operators/StreamGroupedReduce.java      |  2 +-
 .../api/windowing/triggers/DeltaTrigger.java    |  2 +-
 .../operators/AbstractStreamOperatorTest.java   |  2 +-
 .../api/operators/ProcessOperatorTest.java      |  2 +-
 .../operators/StreamingRuntimeContextTest.java  |  2 +-
 .../api/operators/co/CoProcessOperatorTest.java |  4 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  3 +-
 .../api/scala/function/StatefulFunction.scala   |  2 +-
 .../StatefulUDFSavepointMigrationITCase.java    |  8 ++--
 .../flink/test/query/QueryableStateITCase.java  |  3 +-
 21 files changed, 90 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 002b16d..70f74b0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -371,8 +371,7 @@ public class RocksDBAsyncSnapshotTest {
 			ValueState<String> state = getPartitionedState(
 					VoidNamespace.INSTANCE,
 					VoidNamespaceSerializer.INSTANCE,
-					new ValueStateDescriptor<>("count",
-							StringSerializer.INSTANCE, "hello"));
+					new ValueStateDescriptor<>("count", StringSerializer.INSTANCE));
 
 		}
 
@@ -383,8 +382,7 @@ public class RocksDBAsyncSnapshotTest {
 			ValueState<String> state = getPartitionedState(
 					VoidNamespace.INSTANCE,
 					VoidNamespaceSerializer.INSTANCE,
-					new ValueStateDescriptor<>("count",
-							StringSerializer.INSTANCE, "hello"));
+					new ValueStateDescriptor<>("count", StringSerializer.INSTANCE));
 
 			state.update(element.getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index de3250a..7e42daa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -45,8 +45,11 @@ public interface ValueState<T> extends State {
 	 * operator instance. If state partitioning is applied, the value returned
 	 * depends on the current operator input, as the operator maintains an
 	 * independent state for each partition.
-	 * 
-	 * @return The operator state value corresponding to the current input.
+	 *
+	 * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
+	 * this will return {@code null} when to value was previously set using {@link #update(Object)}.
+	 *
+	 * @return The state value corresponding to the current input.
 	 * 
 	 * @throws IOException Thrown if the system cannot access the state.
 	 */
@@ -59,8 +62,7 @@ public interface ValueState<T> extends State {
 	 * partitioned state is updated with null, the state for the current key 
 	 * will be removed and the default value is returned on the next access.
 	 * 
-	 * @param value
-	 *            The new value for the state.
+	 * @param value The new value for the state.
 	 *            
 	 * @throws IOException Thrown if the system cannot access the state.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 7db9116..b3006c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -27,6 +27,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * value state using
  * {@link org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}.
  *
+ * <p>If you don't use one of the constructors that set a default value the value that
you
+ * get when reading a {@link ValueState} using {@link ValueState#value()} will be {@code
null}.
+ *
  * @param <T> The type of the values that the value state can hold.
  */
 @PublicEvolving
@@ -38,12 +41,16 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>,
T> {
 	 * 
 	 * <p>If this constructor fails (because it is not possible to describe the type via
a class),
 	 * consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor.
-	 * 
+	 *
+	 * @deprecated Use {@link #ValueStateDescriptor(String, Class)} instead and manually manage
+	 * the default value by checking whether the contents of the state is {@code null}.
+	 *
 	 * @param name The (unique) name for the state.
 	 * @param typeClass The type of the values in the state.   
 	 * @param defaultValue The default value that will be set when requesting state without
setting
 	 *                     a value before.
 	 */
+	@Deprecated
 	public ValueStateDescriptor(String name, Class<T> typeClass, T defaultValue) {
 		super(name, typeClass, defaultValue);
 	}
@@ -51,11 +58,15 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>,
T> {
 	/**
 	 * Creates a new {@code ValueStateDescriptor} with the given name and default value.
 	 *
+	 * @deprecated Use {@link #ValueStateDescriptor(String, TypeInformation)} instead and manually
+	 * manage the default value by checking whether the contents of the state is {@code null}.
+	 *
 	 * @param name The (unique) name for the state.
 	 * @param typeInfo The type of the values in the state.
 	 * @param defaultValue The default value that will be set when requesting state without
setting
 	 *                     a value before.
 	 */
+	@Deprecated
 	public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue)
{
 		super(name, typeInfo, defaultValue);
 	}
@@ -64,15 +75,52 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>,
T> {
 	 * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the
specific
 	 * serializer.
 	 *
+	 * @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually
+	 * manage the default value by checking whether the contents of the state is {@code null}.
+	 *
 	 * @param name The (unique) name for the state.
 	 * @param typeSerializer The type serializer of the values in the state.
 	 * @param defaultValue The default value that will be set when requesting state without
setting
 	 *                     a value before.
 	 */
+	@Deprecated
 	public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue)
{
 		super(name, typeSerializer, defaultValue);
 	}
 
+	/**
+	 * Creates a new {@code ValueStateDescriptor} with the given name and type
+	 *
+	 * <p>If this constructor fails (because it is not possible to describe the type via
a class),
+	 * consider using the {@link #ValueStateDescriptor(String, TypeInformation)} constructor.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeClass The type of the values in the state.
+	 */
+	public ValueStateDescriptor(String name, Class<T> typeClass) {
+		super(name, typeClass, null);
+	}
+
+	/**
+	 * Creates a new {@code ValueStateDescriptor} with the given name and type.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeInfo The type of the values in the state.
+	 */
+	public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
+		super(name, typeInfo, null);
+	}
+
+	/**
+	 * Creates a new {@code ValueStateDescriptor} with the given name and the specific serializer.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeSerializer The type serializer of the values in the state.
+	 */
+	public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
+		super(name, typeSerializer, null);
+	}
+
 	// ------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b5601ef..832a0ba 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -102,8 +102,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 			nfaOperatorState = getPartitionedState(
 					new ValueStateDescriptor<NFA<IN>>(
 						NFA_OPERATOR_STATE_NAME,
-						new NFA.Serializer<IN>(),
-						null));
+						new NFA.Serializer<IN>()));
 		}
 
 		@SuppressWarnings("unchecked,rawtypes")
@@ -116,8 +115,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 						PRIORIRY_QUEUE_STATE_NAME,
 						new PriorityQueueSerializer<>(
 								streamRecordSerializer,
-								new PriorityQueueStreamRecordFactory<IN>()),
-						null));
+								new PriorityQueueStreamRecordFactory<IN>())));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 2076c08..2c385c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -268,7 +268,7 @@ public class QueryableStateClientTest {
 				servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]);
 				servers[i].start();
 				ValueStateDescriptor<Integer> descriptor =
-						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+						new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 
 				RegisteredBackendStateMetaInfo<VoidNamespace, Integer> registeredBackendStateMetaInfo
= new RegisteredBackendStateMetaInfo<>(
 						descriptor.getType(),

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 0db8b31..86f8766 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -562,7 +562,7 @@ public class KvStateClientTest {
 			clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
 
 			// Create state
-			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE,
null);
+			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 			desc.setQueryable("any");
 
 			// Create servers

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 348d4d9..e8caf57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -88,7 +88,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Register state
-		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE,
null);
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 		desc.setQueryable("vanilla");
 
 		int numKeyGroups =1;
@@ -227,7 +227,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		registry.registerListener(registryListener);
 
 		// Register state
-		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE,
null);
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 		desc.setQueryable("vanilla");
 
 		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
@@ -372,7 +372,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		registry.registerListener(registryListener);
 
 		// Register state
-		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE,
null);
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 		desc.setQueryable("vanilla");
 
 		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);
@@ -511,7 +511,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		registry.registerListener(registryListener);
 
 		// Register state
-		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE,
null);
+		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 		desc.setQueryable("vanilla");
 
 		ValueState<Integer> state = backend.getPartitionedState(
@@ -607,7 +607,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		registry.registerListener(registryListener);
 
 		// Register state
-		ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE,
null);
+		ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
 		desc.setQueryable("vanilla");
 
 		ValueState<byte[]> state = backend.getPartitionedState(

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
index b1c4a9f..249b225 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
@@ -105,7 +105,7 @@ public class KvStateServerTest {
 
 			registry.registerListener(registryListener);
 
-			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE,
null);
+			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
 			desc.setQueryable("vanilla");
 
 			ValueState<Integer> state = backend.getPartitionedState(

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 9bc4c53..641e14b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -155,7 +155,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class,
null);
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
@@ -253,8 +253,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				new KeyGroupRange(0, 0),
 				new DummyEnvironment("test_op", 1, 0));
 
-		ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string",
StringSerializer.INSTANCE, null);
-		ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer",
IntSerializer.INSTANCE, null);
+		ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string",
StringSerializer.INSTANCE);
+		ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer",
IntSerializer.INSTANCE);
 
 		desc1.initializeSerializerUnlessSet(new ExecutionConfig());
 		desc2.initializeSerializerUnlessSet(new ExecutionConfig());
@@ -822,7 +822,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				new KeyGroupRange(0, MAX_PARALLELISM - 1),
 				new DummyEnvironment("test", 1, 0));
 
-		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class,
null);
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE,
kvId);
@@ -904,7 +904,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 			CheckpointStreamFactory streamFactory = createStreamFactory();
 			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class,
null);
+			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE,
kvId);
@@ -927,7 +927,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 				(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
 
 			try {
-				kvId = new ValueStateDescriptor<>("id", fakeStringSerializer, null);
+				kvId = new ValueStateDescriptor<>("id", fakeStringSerializer);
 
 				state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE,
kvId);
 
@@ -1259,8 +1259,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend>
exten
 
 		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>(
 				"test",
-				IntSerializer.INSTANCE,
-				null);
+				IntSerializer.INSTANCE);
 		desc.setQueryable("banana");
 
 		backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 560ecab..73d8926 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -652,8 +652,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName) {
 		ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<T>(
 				UUID.randomUUID().toString(),
-				getType(),
-				null);
+				getType());
 
 		return asQueryableState(queryableStateName, valueStateDescriptor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 86fd8e4..76a18d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -71,7 +71,7 @@ public class StreamGroupedFold<IN, OUT, KEY>
 			initialValue = outTypeSerializer.deserialize(in);
 		}
 		
-		ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME,
outTypeSerializer, null);
+		ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME,
outTypeSerializer);
 		values = getPartitionedState(stateId);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 48b9c2d..156f336 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -45,7 +45,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN,
Reduc
 	@Override
 	public void open() throws Exception {
 		super.open();
-		ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer,
null);
+		ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
 		values = getPartitionedState(stateId);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index 4a7262a..89cca22 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -46,7 +46,7 @@ public class DeltaTrigger<T, W extends Window> extends Trigger<T,
W> {
 	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T>
stateSerializer) {
 		this.deltaFunction = deltaFunction;
 		this.threshold = threshold;
-		stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer, null);
+		stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 2844fbb..f4051c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -491,7 +491,7 @@ public class AbstractStreamOperatorTest {
 		private transient InternalTimerService<VoidNamespace> timerService;
 
 		private final ValueStateDescriptor<String> stateDescriptor =
-				new ValueStateDescriptor<>("state", StringSerializer.INSTANCE, null);
+				new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
 
 		@Override
 		public void open() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 74fd044..89d9899 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -349,7 +349,7 @@ public class ProcessOperatorTest extends TestLogger {
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<Integer> state =
-				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE,  null);
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
 
 		private final TimeDomain timeDomain;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 0d9003f..2791726 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -77,7 +77,7 @@ public class StreamingRuntimeContextTest {
 				createMockEnvironment(),
 				Collections.<String, Accumulator<?, ?>>emptyMap());
 
-		ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class,
null);
+		ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class);
 		context.getState(descr);
 		
 		StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index a449359..eea428f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -398,7 +398,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<String> state =
-				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
 
 		@Override
 		public void processElement1(Integer value, Context ctx, Collector<String> out) throws
Exception {
@@ -479,7 +479,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<String> state =
-				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
 
 		@Override
 		public void processElement1(Integer value, Context ctx, Collector<String> out) throws
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index f2999b3..b251ca6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -474,8 +474,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   def asQueryableState(queryableStateName: String) : QueryableStateStream[K, T] = {
     val stateDescriptor = new ValueStateDescriptor(
       queryableStateName,
-      dataType.createSerializer(executionConfig),
-      null.asInstanceOf[T])
+      dataType.createSerializer(executionConfig))
 
     asQueryableState(queryableStateName, stateDescriptor)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 2cb2761..52dc1a6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -47,7 +47,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
   }
 
   override def open(c: Configuration) = {
-    val info = new ValueStateDescriptor[S]("state", stateSerializer, null.asInstanceOf[S])
+    val info = new ValueStateDescriptor[S]("state", stateSerializer)
     state = getRuntimeContext().getState(info)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
index cc21683..10a8998 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
@@ -358,7 +358,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB
 				new Tuple2<>("hello", 42L);
 
 		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
 
 		@Override
 		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>>
out) throws Exception {
@@ -385,7 +385,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB
 		private transient Tuple2<String, Long> restoredState;
 
 		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -419,7 +419,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
 
 		@Override
 		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>>
out) throws Exception {
@@ -434,7 +434,7 @@ public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestB
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<Long> stateDescriptor =
-				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE, null);
+				new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
 
 		@Override
 		public void open(Configuration parameters) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a2d3bea/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40daeaf..a5ed6ad 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -603,8 +603,7 @@ public class QueryableStateITCase extends TestLogger {
 			// Value state
 			ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>(
 					"any",
-					source.getType(),
-					null);
+					source.getType());
 
 			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
 					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {


Mime
View raw message