flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [15/16] flink git commit: [FLINK-3312] Add simple constructors for State Descriptors
Date Wed, 03 Feb 2016 20:12:34 GMT
[FLINK-3312] Add simple constructors for State Descriptors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/180cd3f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/180cd3f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/180cd3f6

Branch: refs/heads/master
Commit: 180cd3f608731208f5b5ed71e3eb80ae5ccdf5fc
Parents: 456d0ab
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 3 17:44:07 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 3 20:28:41 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   4 +-
 .../contrib/streaming/state/DbStateBackend.java |  22 +-
 .../state/DBStateCheckpointingTest.java         |  13 +-
 .../streaming/state/DbStateBackendTest.java     |  16 +-
 .../api/common/state/ListStateDescriptor.java   |  82 +++---
 .../common/state/ReducingStateDescriptor.java   |  97 ++++---
 .../apache/flink/api/common/state/State.java    |   8 +-
 .../flink/api/common/state/StateBackend.java    |   9 +-
 .../flink/api/common/state/StateDescriptor.java | 264 ++++++++++++++++++-
 .../api/common/state/ValueStateDescriptor.java  | 162 +++---------
 .../common/state/ListStateDescriptorTest.java   |  91 +++++++
 .../state/ReducingStateDescriptorTest.java      | 105 ++++++++
 .../common/state/ValueStateDescriptorTest.java  | 104 ++++++++
 .../examples/windowing/SessionWindowing.java    |   6 +-
 .../flink/runtime/state/AbstractHeapState.java  |   2 +-
 .../runtime/state/AbstractStateBackend.java     |   7 +-
 .../org/apache/flink/runtime/state/KvState.java |   2 +-
 .../flink/runtime/state/KvStateSnapshot.java    |   2 +-
 .../state/filesystem/AbstractFsState.java       |   2 +-
 .../filesystem/AbstractFsStateSnapshot.java     |   2 +-
 .../runtime/state/memory/AbstractMemState.java  |   2 +-
 .../state/memory/AbstractMemStateSnapshot.java  |   2 +-
 .../runtime/state/StateBackendTestBase.java     |  31 ++-
 .../api/operators/AbstractStreamOperator.java   |   4 +-
 .../api/operators/StreamGroupedFold.java        |  11 +-
 .../api/operators/StreamGroupedReduce.java      |   2 +-
 .../api/operators/StreamingRuntimeContext.java  |   5 +-
 .../triggers/ContinuousEventTimeTrigger.java    |   8 +-
 .../ContinuousProcessingTimeTrigger.java        |  11 +-
 .../api/windowing/triggers/CountTrigger.java    |  10 +-
 .../api/windowing/triggers/DeltaTrigger.java    |   2 +-
 .../api/windowing/triggers/Trigger.java         |   2 +-
 .../windowing/EvictingWindowOperator.java       |   8 +-
 .../windowing/NonKeyedWindowOperator.java       |   5 +-
 .../operators/windowing/WindowOperator.java     |  13 +-
 .../operators/StreamingRuntimeContextTest.java  | 156 +++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   3 +-
 .../api/scala/function/StatefulFunction.scala   |   2 +-
 .../EventTimeWindowCheckpointingITCase.java     |   4 +-
 .../PartitionedStateCheckpointingITCase.java    |  10 +-
 .../StreamCheckpointingITCase.java              |   4 +-
 42 files changed, 953 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 6dbe16c..3ecec5d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -62,7 +62,7 @@ import static java.util.Objects.requireNonNull;
  * @param <SD> The type of {@link StateDescriptor}.
  * @param <Backend> The type of the backend that snapshots this key/value state.
  */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend>
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
 	implements KvState<K, N, S, SD, Backend>, State {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
@@ -258,7 +258,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 		}
 	}
 
-	public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
+	public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
 		private static final long serialVersionUID = 1L;
 
 		private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index c55b3c0..1d1ccd7 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -195,6 +195,11 @@ public class DbStateBackend extends AbstractStateBackend {
 	@Override
 	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
 		ValueStateDescriptor<T> stateDesc) throws Exception {
+		
+		if (!stateDesc.isSerializerInitialized()) {
+			throw new IllegalArgumentException("state descriptor serializer not initialized");
+		}
+		
 		String stateName = operatorIdentifier + "_"+ stateDesc.getName();
 
 		return new LazyDbValueState<>(
@@ -210,7 +215,14 @@ public class DbStateBackend extends AbstractStateBackend {
 	@Override
 	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
 		ListStateDescriptor<T> stateDesc) throws Exception {
-		ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, new ArrayListSerializer<>(stateDesc.getSerializer()));
+
+		if (!stateDesc.isSerializerInitialized()) {
+			throw new IllegalArgumentException("state descriptor serializer not initialized");
+		}
+		
+		ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), 
+				new ArrayListSerializer<>(stateDesc.getSerializer()), null);
+		
 		ValueState<ArrayList<T>> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
 		return new GenericListState<>(valueState);
 	}
@@ -220,7 +232,13 @@ public class DbStateBackend extends AbstractStateBackend {
 	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
 		ReducingStateDescriptor<T> stateDesc) throws Exception {
 
-		ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, stateDesc.getSerializer());
+		if (!stateDesc.isSerializerInitialized()) {
+			throw new IllegalArgumentException("state descriptor serializer not initialized");
+		}
+		
+		ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(
+				stateDesc.getName(), stateDesc.getSerializer(), null);
+		
 		ValueState<T> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
 		return new GenericReducingState<>(valueState, stateDesc.getReduceFunction());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
index 0afdada..cc2147f 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
@@ -31,14 +31,13 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.FileUtils;
+
 import org.apache.derby.drda.NetworkServerControl;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -210,7 +209,7 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase {
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
 			sum = getRuntimeContext().getState(
-					new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE));
+					new ValueStateDescriptor<>("my_state", Long.class, 0L));
 		}
 
 		@Override
@@ -238,11 +237,9 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase {
 		@Override
 		public void open(Configuration parameters) throws IOException {
 			aCounts = getRuntimeContext().getState(
-					new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), 
-							new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
+					new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L)));
 			
-			bCounts = getRuntimeContext().getState(
-					new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE));
+			bCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("b", Long.class, 0L));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 34adf75..d4883dd 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -63,7 +63,6 @@ import org.junit.Test;
 import com.google.common.base.Optional;
 
 import static org.junit.Assert.*;
-import static org.junit.Assert.fail;
 
 public class DbStateBackendTest {
 
@@ -201,7 +200,7 @@ public class DbStateBackendTest {
 			backend.initializeForJob(env, "dummy_test_kv", IntSerializer.INSTANCE);
 
 			ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE,
-				new ValueStateDescriptor<>("state1", null, StringSerializer.INSTANCE));
+				new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, null));
 
 			LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state;
 
@@ -455,9 +454,14 @@ public class DbStateBackendTest {
 		backend2.initializeForJob(new DummyEnvironment("test", 3, 1), "dummy_2", StringSerializer.INSTANCE);
 		backend3.initializeForJob(new DummyEnvironment("test", 3, 2), "dummy_3", StringSerializer.INSTANCE);
 
-		ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a1", null, StringSerializer.INSTANCE));
-		ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a2", null, StringSerializer.INSTANCE));
-		ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a3", null, StringSerializer.INSTANCE));
+		ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, 
+				new ValueStateDescriptor<>("a1", StringSerializer.INSTANCE, null));
+		
+		ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, 
+				new ValueStateDescriptor<>("a2", StringSerializer.INSTANCE, null));
+		
+		ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, 
+				new ValueStateDescriptor<>("a3", StringSerializer.INSTANCE, null));
 
 		LazyDbValueState<?, ?, ?> s1 = (LazyDbValueState<?, ?, ?>) s1State;
 		LazyDbValueState<?, ?, ?> s2 = (LazyDbValueState<?, ?, ?>) s2State;
@@ -520,7 +524,7 @@ public class DbStateBackendTest {
 		backend.initializeForJob(env, "dummy_test_caching", IntSerializer.INSTANCE);
 
 		ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE,
-			new ValueStateDescriptor<>("state1", "a", StringSerializer.INSTANCE));
+			new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, "a"));
 
 		LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index e391126..9f266d3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,82 +6,68 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import static java.util.Objects.requireNonNull;
-
 /**
- * {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
+ * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
  * list state using
- * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
  */
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>> {
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	private static final long serialVersionUID = 1L;
 
-	private final TypeSerializer<T> serializer;
-
 	/**
-	 * Creates a new {@code ListStateDescriptor} with the given name.
+	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+	 *
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param serializer {@link TypeSerializer} for the state values.
+	 * @param typeClass The type of the values in the state.
 	 */
-	public ListStateDescriptor(String name, TypeSerializer<T> serializer) {
-		super(requireNonNull(name));
-		this.serializer = requireNonNull(serializer);
-	}
-
-	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createListState(this);
+	public ListStateDescriptor(String name, Class<T> typeClass) {
+		super(name, typeClass, null);
 	}
 
 	/**
-	 * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
+	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeInfo The type of the values in the state.
 	 */
-	public TypeSerializer<T> getSerializer() {
-		return serializer;
+	public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
+		super(name, typeInfo, null);
 	}
 
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
+	/**
+	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeSerializer The type serializer for the list values.
+	 */
+	public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
+		super(name, typeSerializer, null);
 	}
+	
+	// ------------------------------------------------------------------------
 
 	@Override
-	public String toString() {
-		return "ListStateDescriptor{" +
-			"serializer=" + serializer +
-			'}';
+	public ListState<T> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createListState(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 7153a05..1ef65a3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 import static java.util.Objects.requireNonNull;
@@ -27,80 +28,70 @@ import static java.util.Objects.requireNonNull;
 /**
  * {@link StateDescriptor} for {@link ReducingState}. This can be used to create partitioned
  * reducing state using
- * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getReducingState(ReducingStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
  */
-public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>> {
+public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {
 	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<T> serializer;
-
+	
+	
 	private final ReduceFunction<T> reduceFunction;
 
 	/**
-	 * Creates a new {@code ReducingStateDescriptor} with the given name and reduce function.
+	 * Creates a new {@code ReducingStateDescriptor} with the given name, type, and default value.
+	 *
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #ReducingStateDescriptor(String, ReduceFunction, TypeInformation)} constructor.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param serializer {@link TypeSerializer} for the state values.
+	 * @param reduceFunction The {@code ReduceFunction} used to aggregate the state.   
+	 * @param typeClass The type of the values in the state.
 	 */
-	public ReducingStateDescriptor(String name,
-			ReduceFunction<T> reduceFunction,
-			TypeSerializer<T> serializer) {
-		super(requireNonNull(name));
+	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
+		super(name, typeClass, null);
+		this.reduceFunction = requireNonNull(reduceFunction);
+
 		if (reduceFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction.");
 		}
-		this.serializer = requireNonNull(serializer);
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public ReducingState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createReducingState(this);
 	}
 
 	/**
-	 * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
+	 * Creates a new {@code ReducingStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
+	 * @param typeInfo The type of the values in the state.
 	 */
-	public TypeSerializer<T> getSerializer() {
-		return serializer;
+	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {
+		super(name, typeInfo, null);
+		this.reduceFunction = requireNonNull(reduceFunction);
 	}
 
 	/**
-	 * Returns the reduce function to be used for the reducing state.
+	 * Creates a new {@code ValueStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
+	 * @param typeSerializer The type serializer of the values in the state.
 	 */
-	public ReduceFunction<T> getReduceFunction() {
-		return reduceFunction;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
+	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeSerializer<T> typeSerializer) {
+		super(name, typeSerializer, null);
+		this.reduceFunction = requireNonNull(reduceFunction);
 	}
 
+	// ------------------------------------------------------------------------
+	
 	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
+	public ReducingState<T> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createReducingState(this);
 	}
 
-	@Override
-	public String toString() {
-		return "ReducingStateDescriptor{" +
-			"serializer=" + serializer +
-			", reduceFunction=" + reduceFunction +
-			'}';
+	/**
+	 * Returns the reduce function to be used for the reducing state.
+	 */
+	public ReduceFunction<T> getReduceFunction() {
+		return reduceFunction;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
index 5a7650e..b97658b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
index d5adf9b..8c7c608 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 737133f..38087fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,17 +6,30 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 import static java.util.Objects.requireNonNull;
@@ -30,21 +43,80 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <S> The type of the State objects created from this {@code StateDescriptor}.
  */
-public abstract class StateDescriptor<S extends State> implements Serializable {
+public abstract class StateDescriptor<S extends State, T> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	/** Name that uniquely identifies state created from this StateDescriptor. */
-	protected final String name;
+	private final String name;
+
+	/** The serializer for the type. May be eagerly initialized in the constructor,
+	 * or lazily once the type is serialized or an ExecutionConfig is provided. */
+	private TypeSerializer<T> serializer;
+
+	/** The default value returned by the state when no other value is bound to a key */
+	private transient T defaultValue;
 
+	/** The type information describing the value type. Only used to lazily create the serializer
+	 * and dropped during serialization */
+	private transient TypeInformation<T> typeInfo;
+	
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Create a new {@code StateDescriptor} with the given name.
+	 * Create a new {@code StateDescriptor} with the given name and the given type serializer.
+	 * 
 	 * @param name The name of the {@code StateDescriptor}.
+	 * @param serializer The type serializer for the values in the state.
+	 * @param defaultValue The default value that will be set when requesting state without setting
+	 *                     a value before.
 	 */
-	public StateDescriptor(String name) {
-		this.name = requireNonNull(name);
+	protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
+		this.name = requireNonNull(name, "name must not be null");
+		this.serializer = requireNonNull(serializer, "serializer must not be null");
+		this.defaultValue = defaultValue;
 	}
 
 	/**
+	 * Create a new {@code StateDescriptor} with the given name and the given type information.
+	 *
+	 * @param name The name of the {@code StateDescriptor}.
+	 * @param typeInfo The type information for the values in the state.
+	 * @param defaultValue The default value that will be set when requesting state without setting
+	 *                     a value before.   
+	 */
+	protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
+		this.name = requireNonNull(name, "name must not be null");
+		this.typeInfo = requireNonNull(typeInfo, "type information must not be null");
+		this.defaultValue = defaultValue;
+	}
+
+	/**
+	 * Create a new {@code StateDescriptor} with the given name and the given type information.
+	 * 
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
+	 *
+	 * @param name The name of the {@code StateDescriptor}.
+	 * @param type The class of the type of values in the state.
+	 * @param defaultValue The default value that will be set when requesting state without setting
+	 *                     a value before.   
+	 */
+	protected StateDescriptor(String name, Class<T> type, T defaultValue) {
+		this.name = requireNonNull(name, "name must not be null");
+		requireNonNull(type, "type class must not be null");
+		
+		try {
+			this.typeInfo = TypeExtractor.createTypeInfo(type);
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot create full type information based on the given class. If the type has generics, please", e);
+		}
+
+		this.defaultValue = defaultValue;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	/**
 	 * Returns the name of this {@code StateDescriptor}.
 	 */
 	public String getName() {
@@ -52,15 +124,179 @@ public abstract class StateDescriptor<S extends State> implements Serializable {
 	}
 
 	/**
+	 * Returns the default value.
+	 */
+	public T getDefaultValue() {
+		if (defaultValue != null) {
+			if (serializer != null) {
+				return serializer.copy(defaultValue);
+			} else {
+				throw new IllegalStateException("Serializer not yet initialized.");
+			}
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
+	 * Note that the serializer may initialized lazily and is only guaranteed to exist after
+	 * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
+	 */
+	public TypeSerializer<T> getSerializer() {
+		if (serializer != null) {
+			return serializer;
+		} else {
+			throw new IllegalStateException("Serializer not yet initialized.");
+		}
+	}
+	
+	/**
 	 * Creates a new {@link State} on the given {@link StateBackend}.
 	 *
 	 * @param stateBackend The {@code StateBackend} on which to create the {@link State}.
 	 */
-	public abstract S bind(StateBackend stateBackend) throws Exception ;
+	public abstract S bind(StateBackend stateBackend) throws Exception;
+	
+	// ------------------------------------------------------------------------
 
-	// Force subclasses to implement
-	public abstract boolean equals(Object o);
+	/**
+	 * Checks whether the serializer has been initialized. Serializer initialization is lazy,
+	 * to allow parametrization of serializers with an {@link ExecutionConfig} via
+	 * {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
+	 * 
+	 * @return True if the serializers have been initialized, false otherwise.
+	 */
+	public boolean isSerializerInitialized() {
+		return serializer != null;
+	}
+
+	/**
+	 * Initializes the serializer, unless it has been initialized before.
+	 * 
+	 * @param executionConfig The execution config to use when creating the serializer.
+	 */
+	public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
+		if (serializer == null) {
+			if (typeInfo != null) {
+				serializer = typeInfo.createSerializer(executionConfig);
+			} else {
+				throw new IllegalStateException(
+						"Cannot initialize serializer after TypeInformation was dropped during serialization");
+			}
+		}
+	}
+	
+	/**
+	 * This method should be called by subclasses prior to serialization. Because the TypeInformation is
+	 * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
+	 * needs to make sure that the serializer is created before the TypeInformation is dropped. 
+	 */
+	private void ensureSerializerCreated() {
+		if (serializer == null) {
+			if (typeInfo != null) {
+				serializer = typeInfo.createSerializer(new ExecutionConfig());
+			} else {
+				throw new IllegalStateException(
+						"Cannot initialize serializer after TypeInformation was dropped during serialization");
+			}
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Standard Utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return name.hashCode() + 41;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		else {
+			StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
+			return this.name.equals(that.name);
+		}
+	}
 
-	// Force subclasses to implement
-	public abstract int hashCode();
+	@Override
+	public String toString() {
+		return getClass().getSimpleName() + 
+				"{ name=" + name +
+				", defaultValue=" + defaultValue +
+				", serializer=" + serializer +
+				'}';
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serialization
+	// ------------------------------------------------------------------------
+
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		// make sure we have a serializer before the type information gets lost
+		ensureSerializerCreated();
+
+		// write all the non-transient fields
+		out.defaultWriteObject();
+
+		// write the non-serializable default value field
+		if (defaultValue == null) {
+			// we don't have a default value
+			out.writeBoolean(false);
+		} else {
+			// we have a default value
+			out.writeBoolean(true);
+
+			byte[] serializedDefaultValue;
+			try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
+					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
+			{
+				serializer.serialize(defaultValue, outView);
+				outView.flush();
+				serializedDefaultValue = baos.toByteArray();
+			}
+			catch (Exception e) {
+				throw new IOException("Unable to serialize default value of type " +
+						defaultValue.getClass().getSimpleName() + ".", e);
+			}
+
+			out.writeInt(serializedDefaultValue.length);
+			out.write(serializedDefaultValue);
+		}
+	}
+
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		// read the non-transient fields
+		in.defaultReadObject();
+
+		// read the default value field
+		boolean hasDefaultValue = in.readBoolean();
+		if (hasDefaultValue) {
+			int size = in.readInt();
+			byte[] buffer = new byte[size];
+			int bytesRead = in.read(buffer);
+
+			if (bytesRead != size) {
+				throw new RuntimeException("Read size does not match expected size.");
+			}
+
+			try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
+			{
+				defaultValue = serializer.deserialize(inView);
+			}
+			catch (Exception e) {
+				throw new IOException("Unable to deserialize default value.", e);
+			}
+		} else {
+			defaultValue = null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index bcfa46f..f949c57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,161 +6,75 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static java.util.Objects.requireNonNull;
 
 /**
  * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned
  * value state using
- * {@link org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ * {@link org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}.
  *
  * @param <T> The type of the values that the value state can hold.
  */
-public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>> {
+public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	private static final long serialVersionUID = 1L;
-
-	private transient T defaultValue;
-
-	private final TypeSerializer<T> serializer;
-
+	
 	/**
-	 * Creates a new {@code ValueStateDescriptor} with the given name and default value.
-	 *
+	 * Creates a new {@code ValueStateDescriptor} with the given name, type, and default value.
+	 * 
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor.
+	 * 
 	 * @param name The (unique) name for the state.
+	 * @param typeClass The type of the values in the state.   
 	 * @param defaultValue The default value that will be set when requesting state without setting
 	 *                     a value before.
-	 * @param serializer {@link TypeSerializer} for the state values.
 	 */
-	public ValueStateDescriptor(String name, T defaultValue, TypeSerializer<T> serializer) {
-		super(requireNonNull(name));
-		this.defaultValue = defaultValue;
-		this.serializer = requireNonNull(serializer);
-	}
-
-	private void writeObject(final ObjectOutputStream out) throws IOException {
-		out.defaultWriteObject();
-
-		if (defaultValue == null) {
-			// we don't have a default value
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			DataOutputViewStreamWrapper outView =
-					new DataOutputViewStreamWrapper(new DataOutputStream(baos));
-
-			try {
-				serializer.serialize(defaultValue, outView);
-			} catch (IOException ioe) {
-				throw new RuntimeException("Unable to serialize default value of type " +
-						defaultValue.getClass().getSimpleName() + ".", ioe);
-			}
-
-			outView.close();
-
-			out.writeInt(baos.size());
-			out.write(baos.toByteArray());
-		}
-
-	}
-
-	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		boolean hasDefaultValue = in.readBoolean();
-
-		if (hasDefaultValue) {
-			int size = in.readInt();
-			byte[] buffer = new byte[size];
-			int bytesRead = in.read(buffer);
-
-			if (bytesRead != size) {
-				throw new RuntimeException("Read size does not match expected size.");
-			}
-
-			ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-			DataInputViewStreamWrapper inView =
-					new DataInputViewStreamWrapper(new DataInputStream(bais));
-			defaultValue = serializer.deserialize(inView);
-		} else {
-			defaultValue = null;
-		}
-	}
-
-	@Override
-	public ValueState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createValueState(this);
+	public ValueStateDescriptor(String name, Class<T> typeClass, T defaultValue) {
+		super(name, typeClass, defaultValue);
 	}
 
 	/**
-	 * Returns the default value.
+	 * Creates a new {@code ValueStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeInfo The type of the values in the state.
+	 * @param defaultValue The default value that will be set when requesting state without setting
+	 *                     a value before.
 	 */
-	public T getDefaultValue() {
-		if (defaultValue != null) {
-			return serializer.copy(defaultValue);
-		} else {
-			return null;
-		}
+	public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
+		super(name, typeInfo, defaultValue);
 	}
 
 	/**
-	 * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
+	 * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific
+	 * serializer.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeSerializer The type serializer of the values in the state.
+	 * @param defaultValue The default value that will be set when requesting state without setting
+	 *                     a value before.
 	 */
-	public TypeSerializer<T> getSerializer() {
-		return serializer;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
+	public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer, T defaultValue) {
+		super(name, typeSerializer, defaultValue);
 	}
 
+	// ------------------------------------------------------------------------
+	
 	@Override
-	public String toString() {
-		return "ValueStateDescriptor{" +
-				"name=" + name +
-				", defaultValue=" + defaultValue +
-				", serializer=" + serializer +
-				'}';
+	public ValueState<T> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createValueState(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
new file mode 100644
index 0000000..6dc00f0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ListStateDescriptorTest {
+	
+	@Test
+	public void testValueStateDescriptorEagerSerializer() throws Exception {
+
+		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+		
+		ListStateDescriptor<String> descr = 
+				new ListStateDescriptor<String>("testName", serializer);
+		
+		assertEquals("testName", descr.getName());
+		assertNotNull(descr.getSerializer());
+		assertEquals(serializer, descr.getSerializer());
+
+		ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertNotNull(copy.getSerializer());
+		assertEquals(serializer, copy.getSerializer());
+	}
+
+	@Test
+	public void testValueStateDescriptorLazySerializer() throws Exception {
+		// some different registered value
+		ExecutionConfig cfg = new ExecutionConfig();
+		cfg.registerKryoType(TaskInfo.class);
+
+		ListStateDescriptor<Path> descr =
+				new ListStateDescriptor<Path>("testName", Path.class);
+		
+		try {
+			descr.getSerializer();
+			fail("should cause an exception");
+		} catch (IllegalStateException ignored) {}
+
+		descr.initializeSerializerUnlessSet(cfg);
+		
+		assertNotNull(descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof KryoSerializer);
+
+		assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+	}
+
+	@Test
+	public void testValueStateDescriptorAutoSerializer() throws Exception {
+
+		ListStateDescriptor<String> descr =
+				new ListStateDescriptor<String>("testName", String.class);
+
+		ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertNotNull(copy.getSerializer());
+		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
new file mode 100644
index 0000000..0bac930
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class ReducingStateDescriptorTest {
+	
+	@Test
+	public void testValueStateDescriptorEagerSerializer() throws Exception {
+
+		@SuppressWarnings("unchecked")
+		ReduceFunction<String> reducer = mock(ReduceFunction.class); 
+		
+		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+		
+		ReducingStateDescriptor<String> descr = 
+				new ReducingStateDescriptor<String>("testName", reducer, serializer);
+		
+		assertEquals("testName", descr.getName());
+		assertNotNull(descr.getSerializer());
+		assertEquals(serializer, descr.getSerializer());
+
+		ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertNotNull(copy.getSerializer());
+		assertEquals(serializer, copy.getSerializer());
+	}
+
+	@Test
+	public void testValueStateDescriptorLazySerializer() throws Exception {
+
+		@SuppressWarnings("unchecked")
+		ReduceFunction<Path> reducer = mock(ReduceFunction.class);
+		
+		// some different registered value
+		ExecutionConfig cfg = new ExecutionConfig();
+		cfg.registerKryoType(TaskInfo.class);
+
+		ReducingStateDescriptor<Path> descr =
+				new ReducingStateDescriptor<Path>("testName", reducer, Path.class);
+
+		try {
+			descr.getSerializer();
+			fail("should cause an exception");
+		} catch (IllegalStateException ignored) {}
+
+		descr.initializeSerializerUnlessSet(cfg);
+		
+		assertNotNull(descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof KryoSerializer);
+
+		assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+	}
+
+	@Test
+	public void testValueStateDescriptorAutoSerializer() throws Exception {
+
+		@SuppressWarnings("unchecked")
+		ReduceFunction<String> reducer = mock(ReduceFunction.class);
+
+		ReducingStateDescriptor<String> descr =
+				new ReducingStateDescriptor<String>("testName", reducer, String.class);
+
+		ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertNotNull(copy.getSerializer());
+		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
new file mode 100644
index 0000000..d03cc47
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ValueStateDescriptorTest {
+	
+	@Test
+	public void testValueStateDescriptorEagerSerializer() throws Exception {
+
+		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+		String defaultValue = "le-value-default";
+		
+		ValueStateDescriptor<String> descr = 
+				new ValueStateDescriptor<String>("testName", serializer, defaultValue);
+		
+		assertEquals("testName", descr.getName());
+		assertEquals(defaultValue, descr.getDefaultValue());
+		assertNotNull(descr.getSerializer());
+		assertEquals(serializer, descr.getSerializer());
+
+		ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertEquals(defaultValue, copy.getDefaultValue());
+		assertNotNull(copy.getSerializer());
+		assertEquals(serializer, copy.getSerializer());
+	}
+
+	@Test
+	public void testValueStateDescriptorLazySerializer() throws Exception {
+		
+		// some default value that goes to the generic serializer
+		Path defaultValue = new Path(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI());
+		
+		// some different registered value
+		ExecutionConfig cfg = new ExecutionConfig();
+		cfg.registerKryoType(TaskInfo.class);
+
+		ValueStateDescriptor<Path> descr =
+				new ValueStateDescriptor<Path>("testName", Path.class, defaultValue);
+
+		try {
+			descr.getSerializer();
+			fail("should cause an exception");
+		} catch (IllegalStateException ignored) {}
+
+		descr.initializeSerializerUnlessSet(cfg);
+		
+		assertNotNull(descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof KryoSerializer);
+
+		assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+	}
+
+	@Test
+	public void testValueStateDescriptorAutoSerializer() throws Exception {
+		
+		String defaultValue = "le-value-default";
+
+		ValueStateDescriptor<String> descr =
+				new ValueStateDescriptor<String>("testName", String.class, defaultValue);
+
+		ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
+
+		assertEquals("testName", copy.getName());
+		assertEquals(defaultValue, copy.getDefaultValue());
+		assertNotNull(copy.getSerializer());
+		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index dafe86f..bd82800 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,10 +17,8 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -103,8 +101,8 @@ public class SessionWindowing {
 
 		private final Long sessionTimeout;
 
-		private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", -1L,
-			BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+		private final ValueStateDescriptor<Long> stateDesc = 
+				new ValueStateDescriptor<>("last-seen", Long.class, -1L);
 
 
 		public SessionTrigger(Long sessionTimeout) {

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
index 206be64..8e77752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
@@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull;
  * @param <SD> The type of StateDescriptor for the State S
  * @param <Backend> The type of the backend that snapshots this key/value state.
  */
-public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend>
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
 		implements KvState<K, N, S, SD, Backend>, State {
 
 	/** Map containing the actual key/value pairs */

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 958b4dc..e989af3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -178,12 +179,16 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	 * @throws Exception Exceptions may occur during initialization of the state and should be forwarded.
 	 */
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S> stateDescriptor) throws Exception {
+	public <K, N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
 
 		if (keySerializer == null) {
 			throw new Exception("State key serializer has not been configured in the config. " +
 					"This operation cannot use partitioned state.");
 		}
+		
+		if (!stateDescriptor.isSerializerInitialized()) {
+			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
 
 		if (keyValueStatesByName == null) {
 			keyValueStatesByName = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
index 7a97dc0..89de000 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java
@@ -35,7 +35,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
  * @param <SD> The type of the {@link StateDescriptor} for state {@code S}.
  * @param <Backend> The type of {@link AbstractStateBackend} that manages this {@code KvState}.
  */
-public interface KvState<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> {
+public interface KvState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> {
 
 	/**
 	 * Sets the current key, which will be used when using the state access methods.

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
index ce72135..245427e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -39,7 +39,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * @param <SD> The type of the {@link StateDescriptor}
  * @param <Backend> The type of the backend that can restore the state from this snapshot.
  */
-public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> extends java.io.Serializable {
+public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> extends java.io.Serializable {
 
 	/**
 	 * Loads the key/value state back from this snapshot.

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
index 5035953..3cae629 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsState.java
@@ -41,7 +41,7 @@ import java.util.Map;
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S>>
+public abstract class AbstractFsState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
 		extends AbstractHeapState<K, N, SV, S, SD, FsStateBackend> {
 
 	/** The file system state backend backing snapshots of this state */

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
index c1e0f12..432a9e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -39,7 +39,7 @@ import java.util.Map;
  * @param <N> The type of the namespace in the snapshot state.
  * @param <SV> The type of the state value.
  */
-public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
index 816c883..cae673d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemState.java
@@ -39,7 +39,7 @@ import java.util.Map;
  * @param <S> The type of State
  * @param <SD> The type of StateDescriptor for the State S
  */
-public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S>>
+public abstract class AbstractMemState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
 		extends AbstractHeapState<K, N, SV, S, SD, MemoryStateBackend> {
 
 	public AbstractMemState(TypeSerializer<K> keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
index d2efd53..5d4f0d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -36,7 +36,7 @@ import java.util.Map;
  * @param <N> The type of the namespace in the snapshot state.
  * @param <SV> The type of the value in the snapshot state.
  */
-public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> {
+public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements KvStateSnapshot<K, N, S, SD, MemoryStateBackend> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 82ab3b3..20a46a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import com.google.common.base.Joiner;
+
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -29,11 +31,10 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.types.IntValue;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,7 +68,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", null, StringSerializer.INSTANCE);
+			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		
 			ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")
@@ -149,7 +152,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE);
+			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 			ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")
@@ -246,7 +249,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 						return value1 + "," + value2;
 					}
 				},
-				StringSerializer.INSTANCE);
+				String.class);
 			ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")
@@ -336,12 +339,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				"test_op",
 				IntSerializer.INSTANCE);
 
-			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id",
-				null,
-				StringSerializer.INSTANCE);
-			ValueState<String> state = backend.getPartitionedState(null,
-				VoidSerializer.INSTANCE,
-				kvId);
+			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+			
+			ValueState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")
 			KvState<Integer, Void, ValueState<String>, ValueStateDescriptor<String>, B> kv =
@@ -379,7 +380,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE);
+			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 			ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")
@@ -427,7 +428,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 						return value1 + "," + value2;
 					}
 				},
-				StringSerializer.INSTANCE);
+				String.class);
 			ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")
@@ -468,7 +469,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		try {
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-			ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", new IntValue(-1), IntValueSerializer.INSTANCE);
+			ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+			
 			ValueState<IntValue> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
 
 			@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f8f26b5..6cb04b3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -254,7 +254,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
-	protected <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) throws Exception {
+	protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
 		return getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, stateDescriptor);
 	}
 
@@ -265,7 +265,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
 	@SuppressWarnings("unchecked")
-	protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S> stateDescriptor) throws Exception {
+	protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
 		return getStateBackend().getPartitionedState(namespace, (TypeSerializer<Object>) namespaceSerializer,
 			stateDescriptor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index e627ec8..0b80884 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -64,10 +64,13 @@ public class StreamGroupedFold<IN, OUT, KEY>
 					"operator. Probably the setOutputType method was not called.");
 		}
 
-		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
-		initialValue = outTypeSerializer.deserialize(in);
-		ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, null, outTypeSerializer);
+		try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+			DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais))
+		{
+			initialValue = outTypeSerializer.deserialize(in);
+		}
+		
+		ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
 		values = getPartitionedState(stateId);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index c054563..2dd7762 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -44,7 +44,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 	@Override
 	public void open() throws Exception {
 		super.open();
-		ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, null, serializer);
+		ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);
 		values = getPartitionedState(stateId);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index f99ab93..c9cc4a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -112,6 +112,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
 		requireNonNull(stateProperties, "The state properties must not be null");
 		try {
+			stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
 			return operator.getPartitionedState(stateProperties);
 		} catch (Exception e) {
 			throw new RuntimeException("Error while getting state", e);
@@ -122,6 +123,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
 		requireNonNull(stateProperties, "The state properties must not be null");
 		try {
+			stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
 			return operator.getPartitionedState(stateProperties);
 		} catch (Exception e) {
 			throw new RuntimeException("Error while getting state", e);
@@ -132,6 +134,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
 		requireNonNull(stateProperties, "The state properties must not be null");
 		try {
+			stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
 			return operator.getPartitionedState(stateProperties);
 		} catch (Exception e) {
 			throw new RuntimeException("Error while getting state", e);
@@ -163,7 +166,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		requireNonNull(stateType, "The state type information must not be null");
 
 		ValueStateDescriptor<S> stateProps = 
-				new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig()));
+				new ValueStateDescriptor<>(name, stateType, defaultState);
 		return getState(stateProps);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 21e35db..02a935c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -38,8 +38,8 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj
 
 	private final long interval;
 
-	private final ValueStateDescriptor<Boolean> stateDesc = new ValueStateDescriptor<>("first", true,
-		BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(new ExecutionConfig()));
+	private final ValueStateDescriptor<Boolean> stateDesc = 
+			new ValueStateDescriptor<>("first", BooleanSerializer.INSTANCE, true);
 
 	private ContinuousEventTimeTrigger(long interval) {
 		this.interval = interval;

http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 10c975f..25d9508 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -36,8 +37,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 
 	private final long interval;
 
-	private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("fire-timestamp", 0L,
-		BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
+	private final ValueStateDescriptor<Long> stateDesc = 
+			new ValueStateDescriptor<>("fire-timestamp", LongSerializer.INSTANCE, 0L);
 
 
 	private ContinuousProcessingTimeTrigger(long interval) {


Mime
View raw message