flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/16] flink git commit: [FLINK-3201] Enhance Partitioned State Interface with State Types
Date Wed, 03 Feb 2016 20:12:30 GMT
[FLINK-3201] Enhance Partitioned State Interface with State Types

Add new state types ValueState, ListState and ReducingState, where
ListState and ReducingState derive from interface MergingState.

ValueState behaves exactly the same as OperatorState. MergingState is a
stateful list to which elements can be added and for which the elements
that it contains can be obtained. If using a ListState the list of
elements is actually kept, for a ReducingState a reduce function is used
to combine all added elements into one. To create a ValueState the user
passes a ValueStateIdentifier to
StreamingRuntimeContext.getPartitionedState() while they would pass a
ListStateIdentifier or ReducingStateIdentifier for the other state
types.

This change is necessary to give the system more information about the
nature of the operator state. We want this to be able to do incremental
snapshots. This would not be possible, for example, if the user had a
List as a state. Inside OperatorState this list would be opaque and
Flink could not create good incremental snapshots.

This also refactors the StateBackend. Before, the logic for partitioned
state was spread out over StreamingRuntimeContext,
AbstractStreamOperator and StateBackend. Now it is consolidated in
StateBackend.

This also adds support for partitioned state in two-input operators.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caf46728
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caf46728
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caf46728

Branch: refs/heads/master
Commit: caf46728045c0b886e6d4ec0aa429a830740a391
Parents: 680c2c3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jan 25 12:33:51 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 3 20:27:51 2016 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/DbStateBackend.java |  77 ++-
 .../contrib/streaming/state/LazyDbKvState.java  | 631 -----------------
 .../streaming/state/LazyDbValueState.java       | 692 +++++++++++++++++++
 .../contrib/streaming/state/MySqlAdapter.java   |   2 +-
 .../streaming/state/DbStateBackendTest.java     | 344 +++++++--
 .../contrib/streaming/state/DerbyAdapter.java   |   1 -
 .../src/test/resources/log4j-test.properties    |   4 +-
 .../api/common/functions/RuntimeContext.java    |  71 +-
 .../util/AbstractRuntimeUDFContext.java         |  17 +-
 .../flink/api/common/state/ListState.java       |  33 +
 .../api/common/state/ListStateDescriptor.java   |  87 +++
 .../flink/api/common/state/MergingState.java    |  66 ++
 .../flink/api/common/state/OperatorState.java   |   1 +
 .../flink/api/common/state/ReducingState.java   |  35 +
 .../common/state/ReducingStateDescriptor.java   | 106 +++
 .../apache/flink/api/common/state/State.java    |  30 +
 .../flink/api/common/state/StateBackend.java    |  50 ++
 .../flink/api/common/state/StateDescriptor.java |  66 ++
 .../flink/api/common/state/ValueState.java      |  69 ++
 .../api/common/state/ValueStateDescriptor.java  | 166 +++++
 .../typeutils/base/TypeSerializerSingleton.java |   2 +-
 .../examples/windowing/SessionWindowing.java    |   6 +-
 .../flink/hdfstests/FileStateBackendTest.java   |  37 +-
 .../runtime/state/AbstractHeapKvState.java      | 146 ----
 .../flink/runtime/state/AbstractHeapState.java  | 164 +++++
 .../runtime/state/AbstractStateBackend.java     | 406 +++++++++++
 .../runtime/state/ArrayListSerializer.java      | 125 ++++
 .../flink/runtime/state/CheckpointListener.java |  37 +
 .../flink/runtime/state/GenericListState.java   | 132 ++++
 .../runtime/state/GenericReducingState.java     | 129 ++++
 .../org/apache/flink/runtime/state/KvState.java |  33 +-
 .../flink/runtime/state/KvStateSnapshot.java    |  25 +-
 .../flink/runtime/state/StateBackend.java       | 220 ------
 .../runtime/state/StateBackendFactory.java      |   4 +-
 .../state/filesystem/AbstractFileState.java     |  95 ---
 .../filesystem/AbstractFileStateHandle.java     |  95 +++
 .../state/filesystem/AbstractFsState.java       |  95 +++
 .../filesystem/AbstractFsStateSnapshot.java     | 136 ++++
 .../filesystem/FileSerializableStateHandle.java |   2 +-
 .../state/filesystem/FileStreamStateHandle.java |   2 +-
 .../runtime/state/filesystem/FsHeapKvState.java |  86 ---
 .../state/filesystem/FsHeapKvStateSnapshot.java | 107 ---
 .../runtime/state/filesystem/FsListState.java   | 140 ++++
 .../state/filesystem/FsReducingState.java       | 149 ++++
 .../state/filesystem/FsStateBackend.java        |  34 +-
 .../runtime/state/filesystem/FsValueState.java  | 126 ++++
 .../runtime/state/memory/AbstractMemState.java  |  82 +++
 .../state/memory/AbstractMemStateSnapshot.java  | 127 ++++
 .../runtime/state/memory/MemHeapKvState.java    |  52 --
 .../runtime/state/memory/MemListState.java      | 111 +++
 .../runtime/state/memory/MemReducingState.java  | 123 ++++
 .../runtime/state/memory/MemValueState.java     | 100 +++
 .../state/memory/MemoryHeapKvStateSnapshot.java | 107 ---
 .../state/memory/MemoryStateBackend.java        |  40 +-
 .../runtime/state/FileStateBackendTest.java     | 280 ++------
 .../FsCheckpointStateOutputStreamTest.java      |   4 +-
 .../runtime/state/MemoryStateBackendTest.java   | 182 +----
 .../runtime/state/StateBackendTestBase.java     | 494 +++++++++++++
 .../streaming/connectors/fs/RollingSink.java    |   4 +-
 .../kafka/FlinkKafkaConsumerBase.java           |   4 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |   4 +-
 .../kafka/testutils/FailingIdentityMapper.java  |   4 +-
 .../kafka/testutils/MockRuntimeContext.java     |  13 +-
 .../api/checkpoint/CheckpointNotifier.java      |  37 -
 .../api/datastream/ConnectedStreams.java        |  15 +
 .../environment/StreamExecutionEnvironment.java |  13 +-
 .../source/MessageAcknowledgingSourceBase.java  |   4 +-
 .../flink/streaming/api/graph/StreamConfig.java |  14 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  19 +-
 .../api/graph/StreamGraphGenerator.java         |  13 +-
 .../flink/streaming/api/graph/StreamNode.java   |  19 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   3 +-
 .../api/operators/AbstractStreamOperator.java   | 208 ++----
 .../operators/AbstractUdfStreamOperator.java    |  11 +-
 .../api/operators/StreamGroupedFold.java        |   8 +-
 .../api/operators/StreamGroupedReduce.java      |   8 +-
 .../streaming/api/operators/StreamOperator.java |   6 +-
 .../api/operators/StreamingRuntimeContext.java  |  80 +--
 .../transformations/TwoInputTransformation.java |  47 ++
 .../triggers/ContinuousEventTimeTrigger.java    |   4 +-
 .../ContinuousProcessingTimeTrigger.java        |   6 +-
 .../api/windowing/triggers/CountTrigger.java    |   4 +-
 .../api/windowing/triggers/DeltaTrigger.java    |   4 +-
 .../api/windowing/triggers/Trigger.java         |   6 +-
 .../runtime/io/StreamInputProcessor.java        |   2 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   2 +
 ...ractAlignedProcessingTimeWindowOperator.java |   4 +-
 .../windowing/NonKeyedWindowOperator.java       |  17 +-
 .../operators/windowing/WindowOperator.java     |  21 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |  54 +-
 .../runtime/tasks/StreamTaskState.java          |  13 +-
 .../runtime/tasks/StreamTaskStateList.java      |   4 +-
 .../flink/streaming/api/DataStreamTest.java     |  10 +-
 .../api/operators/co/SelfConnectionTest.java    |   6 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  28 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  66 +-
 .../runtime/state/StateBackendITCase.java       |  49 +-
 .../flink/streaming/util/MockContext.java       |  33 +-
 .../util/OneInputStreamOperatorTestHarness.java |  44 +-
 .../util/TwoInputStreamOperatorTestHarness.java |   8 +-
 .../api/scala/StreamExecutionEnvironment.scala  |   7 +-
 .../api/scala/function/StatefulFunction.scala   |   2 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   4 +-
 .../EventTimeWindowCheckpointingITCase.java     |   4 +-
 .../PartitionedStateCheckpointingITCase.java    |  26 +-
 .../test/checkpointing/SavepointITCase.java     |  10 +-
 .../checkpointing/StateCheckpointedITCase.java  |   8 +-
 .../StreamCheckpointNotifierITCase.java         |  26 +-
 .../WindowCheckpointingITCase.java              |   4 +-
 .../jar/CheckpointedStreamingProgram.java       |   4 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   6 +-
 .../JobManagerCheckpointRecoveryITCase.java     |   4 +-
 113 files changed, 5279 insertions(+), 2532 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index ad5ec56..c55b3c0 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -17,17 +17,26 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Random;
 import java.util.concurrent.Callable;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.GenericListState;
+import org.apache.flink.runtime.state.GenericReducingState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
@@ -36,9 +45,9 @@ import org.slf4j.LoggerFactory;
 import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
 
 /**
- * {@link StateBackend} for storing checkpoints in JDBC supporting databases.
+ * {@link AbstractStateBackend} for storing checkpoints in JDBC supporting databases.
  * Key-Value state is stored out-of-core and is lazily fetched using the
- * {@link LazyDbKvState} implementation. A different backend can also be
+ * {@link LazyDbValueState} implementation. A different backend can also be
  * provided in the constructor to store the non-partitioned states. A common use
  * case would be to store the key-value states in the database and store larger
  * non-partitioned states on a distributed file system.
@@ -56,7 +65,7 @@ import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
  * {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
  *
  */
-public class DbStateBackend extends StateBackend<DbStateBackend> {
+public class DbStateBackend extends AbstractStateBackend {
 
 	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
@@ -79,10 +88,12 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 
 	private transient PreparedStatement insertStatement;
 
+	private String operatorIdentifier;
+
 	// ------------------------------------------------------
 
 	// We allow to use a different backend for storing non-partitioned states
-	private StateBackend<?> nonPartitionedStateBackend = null;
+	private AbstractStateBackend nonPartitionedStateBackend = null;
 
 	// ------------------------------------------------------
 
@@ -104,7 +115,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 	 * non-partitioned state snapshots.
 	 * 
 	 */
-	public DbStateBackend(DbBackendConfig backendConfig, StateBackend<?> backend) {
+	public DbStateBackend(DbBackendConfig backendConfig, AbstractStateBackend backend) {
 		this(backendConfig);
 		this.nonPartitionedStateBackend = backend;
 	}
@@ -160,7 +171,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 
 					insertStatement.executeUpdate();
 
-					return new DbStateHandle<S>(appIdShort, checkpointID, timestamp, handleId,
+					return new DbStateHandle<>(appIdShort, checkpointID, timestamp, handleId,
 							dbConfig, serializedState.length);
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -182,20 +193,46 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 	}
 
 	@Override
-	public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName,
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
-		return new LazyDbKvState<K, V>(
-				stateId + "_" + env.getApplicationID().toShortString(),
-				env.getTaskInfo().getIndexOfThisSubtask() == 0,
-				getConnections(),
-				getConfiguration(),
-				keySerializer,
-				valueSerializer,
-				defaultValue);
+	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<T> stateDesc) throws Exception {
+		String stateName = operatorIdentifier + "_"+ stateDesc.getName();
+
+		return new LazyDbValueState<>(
+			stateName,
+			env.getTaskInfo().getIndexOfThisSubtask() == 0,
+			getConnections(),
+			getConfiguration(),
+			keySerializer,
+			namespaceSerializer,
+			stateDesc);
+	}
+
+	@Override
+	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<T> stateDesc) throws Exception {
+		ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, new ArrayListSerializer<>(stateDesc.getSerializer()));
+		ValueState<ArrayList<T>> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
+		return new GenericListState<>(valueState);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<T> stateDesc) throws Exception {
+
+		ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, stateDesc.getSerializer());
+		ValueState<T> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
+		return new GenericReducingState<>(valueState, stateDesc.getReduceFunction());
 	}
 
 	@Override
-	public void initializeForJob(final Environment env) throws Exception {
+	public void initializeForJob(final Environment env,
+		String operatorIdentifier,
+		TypeSerializer<?> keySerializer) throws Exception {
+		super.initializeForJob(env, operatorIdentifier, keySerializer);
+
+		this.operatorIdentifier = operatorIdentifier;
+
 		this.rnd = new Random();
 		this.env = env;
 
@@ -221,7 +258,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
 				}
 			}, numSqlRetries, sqlRetrySleep);
 		} else {
-			nonPartitionedStateBackend.initializeForJob(env);
+			nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
 		}
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
deleted file mode 100644
index 5d16be6..0000000
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
+++ /dev/null
@@ -1,631 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.contrib.streaming.state.ShardedConnection.ShardedStatement;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-
-/**
- * 
- * Lazily fetched {@link KvState} using a SQL backend. Key-value pairs are
- * cached on heap and are lazily retrieved on access.
- * 
- */
-public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, CheckpointNotifier {
-
-	private static final Logger LOG = LoggerFactory.getLogger(LazyDbKvState.class);
-
-	// ------------------------------------------------------
-
-	// Unique id for this state (appID_operatorID_stateName)
-	private final String kvStateId;
-	private final boolean compact;
-
-	private K currentKey;
-	private final V defaultValue;
-
-	private final TypeSerializer<K> keySerializer;
-	private final TypeSerializer<V> valueSerializer;
-
-	// ------------------------------------------------------
-
-	// Max number of retries for failed database operations
-	private final int numSqlRetries;
-	// Sleep time between two retries
-	private final int sqlRetrySleep;
-	// Max number of key-value pairs inserted in one batch to the database
-	private final int maxInsertBatchSize;
-	// We will do database compaction every so many checkpoints
-	private final int compactEvery;
-	// Executor for automatic compactions
-	private ExecutorService executor = null;
-
-	// Database properties
-	private final DbBackendConfig conf;
-	private final ShardedConnection connections;
-	private final DbAdapter dbAdapter;
-
-	// Convenience object for handling inserts to the database
-	private final BatchInserter batchInsert;
-
-	// Statements for key-lookups and inserts as prepared by the dbAdapter
-	private ShardedStatement selectStatements;
-	private ShardedStatement insertStatements;
-
-	// ------------------------------------------------------
-
-	// LRU cache for the key-value states backed by the database
-	private final StateCache cache;
-
-	private long nextTs;
-	private Map<Long, Long> completedCheckpoints = new HashMap<>();
-
-	private volatile long lastCompactedTs;
-
-	// ------------------------------------------------------
-
-	/**
-	 * Constructor to initialize the {@link LazyDbKvState} the first time the
-	 * job starts.
-	 */
-	public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, DbBackendConfig conf,
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
-		this(kvStateId, compact, cons, conf, keySerializer, valueSerializer, defaultValue, 1, 0);
-	}
-
-	/**
-	 * Initialize the {@link LazyDbKvState} from a snapshot.
-	 */
-	public LazyDbKvState(String kvStateId, boolean compact, ShardedConnection cons, final DbBackendConfig conf,
-			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, long nextTs,
-			long lastCompactedTs)
-					throws IOException {
-
-		this.kvStateId = kvStateId;
-		this.compact = compact;
-		if (compact) {
-			// Compactions will run in a seperate thread
-			executor = Executors.newSingleThreadExecutor();
-		}
-
-		this.keySerializer = keySerializer;
-		this.valueSerializer = valueSerializer;
-		this.defaultValue = defaultValue;
-
-		this.maxInsertBatchSize = conf.getMaxKvInsertBatchSize();
-		this.conf = conf;
-		this.connections = cons;
-		this.dbAdapter = conf.getDbAdapter();
-		this.compactEvery = conf.getKvStateCompactionFrequency();
-		this.numSqlRetries = conf.getMaxNumberOfSqlRetries();
-		this.sqlRetrySleep = conf.getSleepBetweenSqlRetries();
-
-		this.nextTs = nextTs;
-		this.lastCompactedTs = lastCompactedTs;
-
-		this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict());
-
-		initDB(this.connections);
-
-		batchInsert = new BatchInserter(connections.getNumShards());
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Lazy database kv-state ({}) successfully initialized", kvStateId);
-		}
-	}
-
-	@Override
-	public void setCurrentKey(K key) {
-		this.currentKey = key;
-	}
-
-	@Override
-	public void update(V value) throws IOException {
-		try {
-			cache.put(currentKey, Optional.fromNullable(value));
-		} catch (RuntimeException e) {
-			// We need to catch the RuntimeExceptions thrown in the StateCache
-			// methods here
-			throw new IOException(e);
-		}
-	}
-
-	@Override
-	public V value() throws IOException {
-		try {
-			// We get the value from the cache (which will automatically load it
-			// from the database if necessary). If null, we return a copy of the
-			// default value
-			V val = cache.get(currentKey).orNull();
-			return val != null ? val : copyDefault();
-		} catch (RuntimeException e) {
-			// We need to catch the RuntimeExceptions thrown in the StateCache
-			// methods here
-			throw new IOException(e);
-		}
-	}
-
-	@Override
-	public DbKvStateSnapshot<K, V> snapshot(long checkpointId, long timestamp) throws IOException {
-
-		// Validate timing assumptions
-		if (timestamp <= nextTs) {
-			throw new RuntimeException("Checkpoint timestamp is smaller than previous ts + 1, "
-					+ "this should not happen.");
-		}
-
-		// If there are any modified states we perform the inserts
-		if (!cache.modified.isEmpty()) {
-			// We insert the modified elements to the database with the current
-			// timestamp then clear the modified states
-			for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
-				batchInsert.add(state, timestamp);
-			}
-			batchInsert.flush(timestamp);
-			cache.modified.clear();
-		} else if (compact) {
-			// Otherwise we call the keep alive method to avoid dropped
-			// connections (only call this on the compactor instance)
-			for (final Connection c : connections.connections()) {
-				SQLRetrier.retry(new Callable<Void>() {
-					@Override
-					public Void call() throws Exception {
-						dbAdapter.keepAlive(c);
-						return null;
-					}
-				}, numSqlRetries, sqlRetrySleep);
-			}
-		}
-
-		nextTs = timestamp + 1;
-		completedCheckpoints.put(checkpointId, timestamp);
-		return new DbKvStateSnapshot<K, V>(kvStateId, timestamp, lastCompactedTs);
-	}
-
-	/**
-	 * Returns the number of elements currently stored in the task's cache. Note
-	 * that the number of elements in the database is not counted here.
-	 */
-	@Override
-	public int size() {
-		return cache.size();
-	}
-
-	/**
-	 * Return a copy the default value or null if the default was null.
-	 * 
-	 */
-	private V copyDefault() {
-		return defaultValue != null ? valueSerializer.copy(defaultValue) : null;
-	}
-
-	/**
-	 * Create a table for the kvstate checkpoints (based on the kvStateId) and
-	 * prepare the statements used during checkpointing.
-	 */
-	private void initDB(final ShardedConnection cons) throws IOException {
-
-		retry(new Callable<Void>() {
-			public Void call() throws Exception {
-
-				for (Connection con : cons.connections()) {
-					dbAdapter.createKVStateTable(kvStateId, con);
-				}
-
-				insertStatements = cons.prepareStatement(dbAdapter.prepareKVCheckpointInsert(kvStateId));
-				selectStatements = cons.prepareStatement(dbAdapter.prepareKeyLookup(kvStateId));
-
-				return null;
-			}
-
-		}, numSqlRetries, sqlRetrySleep);
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) {
-		final Long ts = completedCheckpoints.remove(checkpointId);
-		if (ts == null) {
-			LOG.warn("Complete notification for missing checkpoint: " + checkpointId);
-		} else {
-			// If compaction is turned on we compact on the compactor subtask
-			// asynchronously in the background
-			if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
-				executor.execute(new Compactor(ts));
-			}
-		}
-	}
-
-	@Override
-	public void dispose() {
-		// We are only closing the statements here, the connection is borrowed
-		// from the state backend and will be closed there.
-		try {
-			selectStatements.close();
-		} catch (SQLException e) {
-			// There is not much to do about this
-		}
-		try {
-			insertStatements.close();
-		} catch (SQLException e) {
-			// There is not much to do about this
-		}
-
-		if (executor != null) {
-			executor.shutdown();
-		}
-	}
-
-	/**
-	 * Return the Map of cached states.
-	 * 
-	 */
-	public Map<K, Optional<V>> getStateCache() {
-		return cache;
-	}
-
-	/**
-	 * Return the Map of modified states that hasn't been written to the
-	 * database yet.
-	 * 
-	 */
-	public Map<K, Optional<V>> getModified() {
-		return cache.modified;
-	}
-
-	/**
-	 * Used for testing purposes
-	 */
-	public boolean isCompactor() {
-		return compact;
-	}
-
-	/**
-	 * Used for testing purposes
-	 */
-	public ExecutorService getExecutor() {
-		return executor;
-	}
-
-	/**
-	 * Snapshot that stores a specific checkpoint timestamp and state id, and
-	 * also rolls back the database to that point upon restore. The rollback is
-	 * done by removing all state checkpoints that have timestamps between the
-	 * checkpoint and recovery timestamp.
-	 *
-	 */
-	private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final String kvStateId;
-		private final long checkpointTimestamp;
-		private final long lastCompactedTimestamp;
-
-		public DbKvStateSnapshot(String kvStateId, long checkpointTimestamp, long lastCompactedTs) {
-			this.checkpointTimestamp = checkpointTimestamp;
-			this.kvStateId = kvStateId;
-			this.lastCompactedTimestamp = lastCompactedTs;
-		}
-
-		@Override
-		public LazyDbKvState<K, V> restoreState(final DbStateBackend stateBackend,
-				final TypeSerializer<K> keySerializer, final TypeSerializer<V> valueSerializer, final V defaultValue,
-				ClassLoader classLoader, final long recoveryTimestamp) throws IOException {
-
-			// Validate timing assumptions
-			if (recoveryTimestamp <= checkpointTimestamp) {
-				throw new RuntimeException(
-						"Recovery timestamp is smaller or equal to checkpoint timestamp. "
-								+ "This might happen if the job was started with a new JobManager "
-								+ "and the clocks got really out of sync.");
-			}
-
-			// First we clean up the states written by partially failed
-			// snapshots
-			retry(new Callable<Void>() {
-				public Void call() throws Exception {
-
-					// We need to perform cleanup on all shards to be safe here
-					for (Connection c : stateBackend.getConnections().connections()) {
-						stateBackend.getConfiguration().getDbAdapter().cleanupFailedCheckpoints(kvStateId,
-								c, checkpointTimestamp, recoveryTimestamp);
-					}
-
-					return null;
-				}
-			}, stateBackend.getConfiguration().getMaxNumberOfSqlRetries(),
-					stateBackend.getConfiguration().getSleepBetweenSqlRetries());
-
-			boolean cleanup = stateBackend.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0;
-
-			// Restore the KvState
-			LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(kvStateId, cleanup,
-					stateBackend.getConnections(), stateBackend.getConfiguration(), keySerializer, valueSerializer,
-					defaultValue, recoveryTimestamp, lastCompactedTimestamp);
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("KV state({},{}) restored.", kvStateId, recoveryTimestamp);
-			}
-
-			return restored;
-		}
-
-		@Override
-		public void discardState() throws Exception {
-			// Don't discard, it will be compacted by the LazyDbKvState
-		}
-
-		@Override
-		public long getStateSize() throws Exception {
-			// Because the state is serialzied in a lazy fashion we don't know
-			// the size of the state yet.
-			return 0;
-		}
-
-	}
-
-	/**
-	 * LRU cache implementation for storing the key-value states. When the cache
-	 * is full elements are not evicted one by one but are evicted in a batch
-	 * defined by the evictionSize parameter.
-	 * <p>
-	 * Keys not found in the cached will be retrieved from the underlying
-	 * database
-	 */
-	private final class StateCache extends LinkedHashMap<K, Optional<V>> {
-		private static final long serialVersionUID = 1L;
-
-		private final int cacheSize;
-		private final int evictionSize;
-
-		// We keep track the state modified since the last checkpoint
-		private final Map<K, Optional<V>> modified = new HashMap<>();
-
-		public StateCache(int cacheSize, int evictionSize) {
-			super(cacheSize, 0.75f, true);
-			this.cacheSize = cacheSize;
-			this.evictionSize = evictionSize;
-		}
-
-		@Override
-		public Optional<V> put(K key, Optional<V> value) {
-			// Put kv pair in the cache and evict elements if the cache is full
-			Optional<V> old = super.put(key, value);
-			modified.put(key, value);
-			evictIfFull();
-			return old;
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Optional<V> get(Object key) {
-			// First we check whether the value is cached
-			Optional<V> value = super.get(key);
-			if (value == null) {
-				// If it doesn't try to load it from the database
-				value = Optional.fromNullable(getFromDatabaseOrNull((K) key));
-				put((K) key, value);
-			}
-			return value;
-		}
-
-		@Override
-		protected boolean removeEldestEntry(Entry<K, Optional<V>> eldest) {
-			// We need to remove elements manually if the cache becomes full, so
-			// we always return false here.
-			return false;
-		}
-
-		/**
-		 * Fetch the current value from the database if exists or return null.
-		 * 
-		 * @param key
-		 * @return The value corresponding to the key and the last checkpointid
-		 *         from the database if exists or null.
-		 */
-		private V getFromDatabaseOrNull(final K key) {
-			try {
-				return retry(new Callable<V>() {
-					public V call() throws Exception {
-						byte[] serializedKey = InstantiationUtil.serializeToByteArray(keySerializer, key);
-						// We lookup using the adapter and serialize/deserialize
-						// with the TypeSerializers
-						byte[] serializedVal = dbAdapter.lookupKey(kvStateId,
-								selectStatements.getForKey(key), serializedKey, nextTs);
-
-						return serializedVal != null
-								? InstantiationUtil.deserializeFromByteArray(valueSerializer, serializedVal) : null;
-					}
-				}, numSqlRetries, sqlRetrySleep);
-			} catch (IOException e) {
-				// We need to re-throw this exception to conform to the map
-				// interface, we will catch this when we call the the put/get
-				throw new RuntimeException(e);
-			}
-		}
-
-		/**
-		 * If the cache is full we remove the evictionSize least recently
-		 * accessed elements and write them to the database if they were
-		 * modified since the last checkpoint.
-		 */
-		private void evictIfFull() {
-			if (size() > cacheSize) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("State cache is full for {}, evicting {} elements.", kvStateId, evictionSize);
-				}
-				try {
-					int numEvicted = 0;
-
-					Iterator<Entry<K, Optional<V>>> entryIterator = entrySet().iterator();
-					while (numEvicted++ < evictionSize && entryIterator.hasNext()) {
-
-						Entry<K, Optional<V>> next = entryIterator.next();
-
-						// We only need to write to the database if modified
-						if (modified.remove(next.getKey()) != null) {
-							batchInsert.add(next, nextTs);
-						}
-
-						entryIterator.remove();
-					}
-
-					batchInsert.flush(nextTs);
-
-				} catch (IOException e) {
-					// We need to re-throw this exception to conform to the map
-					// interface, we will catch this when we call the the
-					// put/get
-					throw new RuntimeException(e);
-				}
-			}
-		}
-
-		@Override
-		public void putAll(Map<? extends K, ? extends Optional<V>> m) {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public void clear() {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	/**
-	 * Object for handling inserts to the database by batching them together
-	 * partitioned on the sharding key. The batches are written to the database
-	 * when they are full or when the inserter is flushed.
-	 *
-	 */
-	private class BatchInserter {
-
-		// Map from shard index to the kv pairs to be inserted
-		// Map<Integer, List<Tuple2<byte[], byte[]>>> inserts = new HashMap<>();
-
-		List<Tuple2<byte[], byte[]>>[] inserts;
-
-		@SuppressWarnings("unchecked")
-		public BatchInserter(int numShards) {
-			inserts = new List[numShards];
-			for (int i = 0; i < numShards; i++) {
-				inserts[i] = new ArrayList<>();
-			}
-		}
-
-		public void add(Entry<K, Optional<V>> next, long timestamp) throws IOException {
-
-			K key = next.getKey();
-			V value = next.getValue().orNull();
-
-			// Get the current partition if present or initialize empty list
-			int shardIndex = connections.getShardIndex(key);
-
-			List<Tuple2<byte[], byte[]>> insertPartition = inserts[shardIndex];
-
-			// Add the k-v pair to the partition
-			byte[] k = InstantiationUtil.serializeToByteArray(keySerializer, key);
-			byte[] v = value != null ? InstantiationUtil.serializeToByteArray(valueSerializer, value) : null;
-			insertPartition.add(Tuple2.of(k, v));
-
-			// If partition is full write to the database and clear
-			if (insertPartition.size() == maxInsertBatchSize) {
-				dbAdapter.insertBatch(kvStateId, conf,
-						connections.getForIndex(shardIndex),
-						insertStatements.getForIndex(shardIndex),
-						timestamp, insertPartition);
-
-				insertPartition.clear();
-			}
-		}
-
-		public void flush(long timestamp) throws IOException {
-			// We flush all non-empty partitions
-			for (int i = 0; i < inserts.length; i++) {
-				List<Tuple2<byte[], byte[]>> insertPartition = inserts[i];
-				if (!insertPartition.isEmpty()) {
-					dbAdapter.insertBatch(kvStateId, conf, connections.getForIndex(i),
-							insertStatements.getForIndex(i), timestamp, insertPartition);
-					insertPartition.clear();
-				}
-			}
-
-		}
-	}
-
-	private class Compactor implements Runnable {
-
-		private long upperBound;
-
-		public Compactor(long upperBound) {
-			this.upperBound = upperBound;
-		}
-
-		@Override
-		public void run() {
-			// We create new database connections to make sure we don't
-			// interfere with the checkpointing (connections are not thread
-			// safe)
-			try (ShardedConnection sc = conf.createShardedConnection()) {
-				for (final Connection c : sc.connections()) {
-					SQLRetrier.retry(new Callable<Void>() {
-						@Override
-						public Void call() throws Exception {
-							dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, upperBound);
-							return null;
-						}
-					}, numSqlRetries, sqlRetrySleep);
-				}
-				if (LOG.isInfoEnabled()) {
-					LOG.info("State succesfully compacted for {} between {} and {}.", kvStateId,
-							lastCompactedTs,
-							upperBound);
-				}
-				lastCompactedTs = upperBound;
-			} catch (SQLException | IOException e) {
-				LOG.warn("State compaction failed due: {}", e);
-			}
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbValueState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbValueState.java
new file mode 100644
index 0000000..753850a
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbValueState.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.ShardedConnection.ShardedStatement;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * 
+ * Lazily fetched {@link KvState} using a SQL backend. Key-value pairs are
+ * cached on heap and are lazily retrieved on access.
+ * 
+ */
+public class LazyDbValueState<K, N, V>
+	implements KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, DbStateBackend>, ValueState<V>, CheckpointListener {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LazyDbValueState.class);
+
+	// ------------------------------------------------------
+
+	// Unique id for this state (appID_operatorID_stateName)
+	private final String kvStateId;
+	private final boolean compact;
+
+	private K currentKey;
+	private N currentNamespace;
+	private final V defaultValue;
+
+	private final TypeSerializer<K> keySerializer;
+	private final TypeSerializer<N> namespaceSerializer;
+	private final TypeSerializer<V> valueSerializer;
+	private final ValueStateDescriptor<V> stateDesc;
+
+	// ------------------------------------------------------
+
+	// Max number of retries for failed database operations
+	private final int numSqlRetries;
+	// Sleep time between two retries
+	private final int sqlRetrySleep;
+	// Max number of key-value pairs inserted in one batch to the database
+	private final int maxInsertBatchSize;
+	// We will do database compaction every so many checkpoints
+	private final int compactEvery;
+	// Executor for automatic compactions
+	private ExecutorService executor = null;
+
+	// Database properties
+	private final DbBackendConfig conf;
+	private final ShardedConnection connections;
+	private final DbAdapter dbAdapter;
+
+	// Convenience object for handling inserts to the database
+	private final BatchInserter batchInsert;
+
+	// Statements for key-lookups and inserts as prepared by the dbAdapter
+	private ShardedStatement selectStatements;
+	private ShardedStatement insertStatements;
+
+	// ------------------------------------------------------
+
+	// LRU cache for the key-value states backed by the database
+	private final StateCache cache;
+
+	private long nextTs;
+	private Map<Long, Long> completedCheckpoints = new HashMap<>();
+
+	private volatile long lastCompactedTs;
+
+	// ------------------------------------------------------
+
+	/**
+	 * Constructor to initialize the {@link LazyDbValueState} the first time the
+	 * job starts.
+	 */
+	public LazyDbValueState(String kvStateId,
+		boolean compact,
+		ShardedConnection cons,
+		DbBackendConfig conf,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc) throws IOException {
+		this(kvStateId, compact, cons, conf, keySerializer, namespaceSerializer, stateDesc, 1, 0);
+	}
+
+	/**
+	 * Initialize the {@link LazyDbValueState} from a snapshot.
+	 */
+	public LazyDbValueState(String kvStateId,
+		boolean compact,
+		ShardedConnection cons,
+		final DbBackendConfig conf,
+		TypeSerializer<K> keySerializer,
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<V> stateDesc,
+		long nextTs,
+		long lastCompactedTs) throws IOException {
+
+		this.kvStateId = kvStateId;
+		this.compact = compact;
+		if (compact) {
+			// Compactions will run in a seperate thread
+			executor = Executors.newSingleThreadExecutor();
+		}
+
+		this.keySerializer = keySerializer;
+		this.namespaceSerializer = namespaceSerializer;
+		this.valueSerializer = stateDesc.getSerializer();
+		this.defaultValue = stateDesc.getDefaultValue();
+		this.stateDesc = stateDesc;
+
+		this.maxInsertBatchSize = conf.getMaxKvInsertBatchSize();
+		this.conf = conf;
+		this.connections = cons;
+		this.dbAdapter = conf.getDbAdapter();
+		this.compactEvery = conf.getKvStateCompactionFrequency();
+		this.numSqlRetries = conf.getMaxNumberOfSqlRetries();
+		this.sqlRetrySleep = conf.getSleepBetweenSqlRetries();
+
+		this.nextTs = nextTs;
+		this.lastCompactedTs = lastCompactedTs;
+
+		this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict());
+
+		initDB(this.connections);
+
+		batchInsert = new BatchInserter(connections.getNumShards());
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Lazy database kv-state ({}) successfully initialized", kvStateId);
+		}
+	}
+
+	@Override
+	public void setCurrentKey(K key) {
+		this.currentKey = key;
+	}
+
+	@Override
+	public void setCurrentNamespace(N namespace) {
+		this.currentNamespace = namespace;
+	}
+
+	@Override
+	public void update(V value) throws IOException {
+		try {
+			cache.put(Tuple2.of(currentKey, currentNamespace), Optional.fromNullable(value));
+		} catch (RuntimeException e) {
+			// We need to catch the RuntimeExceptions thrown in the StateCache
+			// methods here
+			throw new IOException(e);
+		}
+	}
+
+	@Override
+	public V value() throws IOException {
+		try {
+			// We get the value from the cache (which will automatically load it
+			// from the database if necessary). If null, we return a copy of the
+			// default value
+			V val = cache.get(Tuple2.of(currentKey, currentNamespace)).orNull();
+			return val != null ? val : copyDefault();
+		} catch (RuntimeException e) {
+			// We need to catch the RuntimeExceptions thrown in the StateCache
+			// methods here
+			throw new IOException(e);
+		}
+	}
+
+	@Override
+	public void clear() {
+		cache.put(Tuple2.of(currentKey, currentNamespace), Optional.<V>fromNullable(null));
+	}
+
+	@Override
+	public DbKvStateSnapshot<K, N, V> snapshot(long checkpointId, long timestamp) throws IOException {
+
+		// Validate timing assumptions
+		if (timestamp <= nextTs) {
+			throw new RuntimeException("Checkpoint timestamp is smaller than previous ts + 1, "
+					+ "this should not happen.");
+		}
+
+		// If there are any modified states we perform the inserts
+		if (!cache.modified.isEmpty()) {
+			// We insert the modified elements to the database with the current
+			// timestamp then clear the modified states
+			for (Entry<Tuple2<K, N>, Optional<V>> state : cache.modified.entrySet()) {
+				batchInsert.add(state, timestamp);
+			}
+			batchInsert.flush(timestamp);
+			cache.modified.clear();
+		} else if (compact) {
+			// Otherwise we call the keep alive method to avoid dropped
+			// connections (only call this on the compactor instance)
+			for (final Connection c : connections.connections()) {
+				SQLRetrier.retry(new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						dbAdapter.keepAlive(c);
+						return null;
+					}
+				}, numSqlRetries, sqlRetrySleep);
+			}
+		}
+
+		nextTs = timestamp + 1;
+		completedCheckpoints.put(checkpointId, timestamp);
+		return new DbKvStateSnapshot<K, N, V>(kvStateId, timestamp, lastCompactedTs, namespaceSerializer, stateDesc);
+	}
+
+	/**
+	 * Returns the number of elements currently stored in the task's cache. Note
+	 * that the number of elements in the database is not counted here.
+	 */
+	public int size() {
+		return cache.size();
+	}
+
+	/**
+	 * Return a copy the default value or null if the default was null.
+	 * 
+	 */
+	private V copyDefault() {
+		return defaultValue != null ? valueSerializer.copy(defaultValue) : null;
+	}
+
+	/**
+	 * Create a table for the kvstate checkpoints (based on the kvStateId) and
+	 * prepare the statements used during checkpointing.
+	 */
+	private void initDB(final ShardedConnection cons) throws IOException {
+
+		retry(new Callable<Void>() {
+			public Void call() throws Exception {
+
+				for (Connection con : cons.connections()) {
+					dbAdapter.createKVStateTable(kvStateId, con);
+				}
+
+				insertStatements = cons.prepareStatement(dbAdapter.prepareKVCheckpointInsert(kvStateId));
+				selectStatements = cons.prepareStatement(dbAdapter.prepareKeyLookup(kvStateId));
+
+				return null;
+			}
+
+		}, numSqlRetries, sqlRetrySleep);
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		final Long ts = completedCheckpoints.remove(checkpointId);
+		if (ts == null) {
+			LOG.warn("Complete notification for missing checkpoint: " + checkpointId);
+		} else {
+			// If compaction is turned on we compact on the compactor subtask
+			// asynchronously in the background
+			if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
+				executor.execute(new Compactor(ts));
+			}
+		}
+	}
+
+	@Override
+	public void dispose() {
+		// We are only closing the statements here, the connection is borrowed
+		// from the state backend and will be closed there.
+		try {
+			selectStatements.close();
+		} catch (SQLException e) {
+			// There is not much to do about this
+		}
+		try {
+			insertStatements.close();
+		} catch (SQLException e) {
+			// There is not much to do about this
+		}
+
+		if (executor != null) {
+			executor.shutdown();
+		}
+	}
+
+	/**
+	 * Return the Map of cached states.
+	 * 
+	 */
+	public Map<Tuple2<K, N>, Optional<V>> getStateCache() {
+		return cache;
+	}
+
+	/**
+	 * Return the Map of modified states that hasn't been written to the
+	 * database yet.
+	 * 
+	 */
+	public Map<Tuple2<K, N>, Optional<V>> getModified() {
+		return cache.modified;
+	}
+
+	/**
+	 * Used for testing purposes
+	 */
+	public boolean isCompactor() {
+		return compact;
+	}
+
+	/**
+	 * Used for testing purposes
+	 */
+	public ExecutorService getExecutor() {
+		return executor;
+	}
+
+	/**
+	 * Snapshot that stores a specific checkpoint timestamp and state id, and
+	 * also rolls back the database to that point upon restore. The rollback is
+	 * done by removing all state checkpoints that have timestamps between the
+	 * checkpoint and recovery timestamp.
+	 *
+	 */
+	private static class DbKvStateSnapshot<K, N, V> implements KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, DbStateBackend> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final String kvStateId;
+		private final long checkpointTimestamp;
+		private final long lastCompactedTimestamp;
+
+		/** Namespace Serializer */
+		private final TypeSerializer<N> namespaceSerializer;
+
+		/** StateDescriptor, for sanity checks */
+		private final ValueStateDescriptor<V> stateDesc;
+
+		public DbKvStateSnapshot(String kvStateId,
+			long checkpointTimestamp,
+			long lastCompactedTs,
+			TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<V> stateDesc) {
+			this.checkpointTimestamp = checkpointTimestamp;
+			this.kvStateId = kvStateId;
+			this.lastCompactedTimestamp = lastCompactedTs;
+			this.namespaceSerializer = namespaceSerializer;
+			this.stateDesc = stateDesc;
+		}
+
+		@Override
+		public KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, DbStateBackend> restoreState(
+			final DbStateBackend stateBackend,
+			TypeSerializer<K> keySerializer,
+			ClassLoader classLoader,
+			final long recoveryTimestamp) throws Exception {
+
+			// Validate timing assumptions
+			if (recoveryTimestamp <= checkpointTimestamp) {
+				throw new RuntimeException(
+						"Recovery timestamp is smaller or equal to checkpoint timestamp. "
+								+ "This might happen if the job was started with a new JobManager "
+								+ "and the clocks got really out of sync.");
+			}
+
+			// First we clean up the states written by partially failed
+			// snapshots
+			retry(new Callable<Void>() {
+				public Void call() throws Exception {
+
+					// We need to perform cleanup on all shards to be safe here
+					for (Connection c : stateBackend.getConnections().connections()) {
+						stateBackend.getConfiguration().getDbAdapter().cleanupFailedCheckpoints(kvStateId,
+								c, checkpointTimestamp, recoveryTimestamp);
+					}
+
+					return null;
+				}
+			}, stateBackend.getConfiguration().getMaxNumberOfSqlRetries(),
+					stateBackend.getConfiguration().getSleepBetweenSqlRetries());
+
+			boolean cleanup = stateBackend.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0;
+
+			// Restore the KvState
+			LazyDbValueState<K, N, V> restored = new LazyDbValueState<>(kvStateId,
+				cleanup,
+				stateBackend.getConnections(),
+				stateBackend.getConfiguration(),
+				keySerializer,
+				namespaceSerializer,
+				stateDesc,
+				recoveryTimestamp,
+				lastCompactedTimestamp);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("KV state({},{}) restored.", kvStateId, recoveryTimestamp);
+			}
+
+			return restored;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			// Don't discard, it will be compacted by the LazyDbKvState
+		}
+
+		@Override
+		public long getStateSize() throws Exception {
+			// Because the state is serialzied in a lazy fashion we don't know
+			// the size of the state yet.
+			return 0;
+		}
+
+	}
+
+	/**
+	 * LRU cache implementation for storing the key-value states. When the cache
+	 * is full elements are not evicted one by one but are evicted in a batch
+	 * defined by the evictionSize parameter.
+	 * <p>
+	 * Keys not found in the cached will be retrieved from the underlying
+	 * database
+	 */
+	private final class StateCache extends LinkedHashMap<Tuple2<K, N>, Optional<V>> {
+		private static final long serialVersionUID = 1L;
+
+		private final int cacheSize;
+		private final int evictionSize;
+
+		// We keep track the state modified since the last checkpoint
+		private final Map<Tuple2<K, N>, Optional<V>> modified = new HashMap<>();
+
+		public StateCache(int cacheSize, int evictionSize) {
+			super(cacheSize, 0.75f, true);
+			this.cacheSize = cacheSize;
+			this.evictionSize = evictionSize;
+		}
+
+		@Override
+		public Optional<V> put(Tuple2<K, N> key, Optional<V> value) {
+			// Put kv pair in the cache and evict elements if the cache is full
+			Optional<V> old = super.put(key, value);
+			modified.put(key, value);
+			evictIfFull();
+			return old;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Optional<V> get(Object key) {
+			// First we check whether the value is cached
+			Optional<V> value = super.get(key);
+			if (value == null) {
+				// If it doesn't try to load it from the database
+				value = Optional.fromNullable(getFromDatabaseOrNull((Tuple2<K, N>) key));
+				put((Tuple2<K, N>) key, value);
+			}
+			return value;
+		}
+
+		@Override
+		protected boolean removeEldestEntry(Entry<Tuple2<K, N>, Optional<V>> eldest) {
+			// We need to remove elements manually if the cache becomes full, so
+			// we always return false here.
+			return false;
+		}
+
+		/**
+		 * Fetch the current value from the database if exists or return null.
+		 * 
+		 * @param key
+		 * @return The value corresponding to the key and the last checkpointid
+		 *         from the database if exists or null.
+		 */
+		private V getFromDatabaseOrNull(final Tuple2<K, N> key) {
+			try {
+				return retry(new Callable<V>() {
+					public V call() throws Exception {
+						ByteArrayOutputStream baos = new ByteArrayOutputStream();
+						DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+						keySerializer.serialize(key.f0, out);
+						namespaceSerializer.serialize(key.f1, out);
+						out.close();
+
+						// We lookup using the adapter and serialize/deserialize
+						// with the TypeSerializers
+						byte[] serializedVal = dbAdapter.lookupKey(kvStateId,
+								selectStatements.getForKey(key.f0), baos.toByteArray(), nextTs);
+
+						return serializedVal != null
+								? InstantiationUtil.deserializeFromByteArray(valueSerializer, serializedVal) : null;
+					}
+				}, numSqlRetries, sqlRetrySleep);
+			} catch (IOException e) {
+				// We need to re-throw this exception to conform to the map
+				// interface, we will catch this when we call the the put/get
+				throw new RuntimeException(e);
+			}
+		}
+
+		/**
+		 * If the cache is full we remove the evictionSize least recently
+		 * accessed elements and write them to the database if they were
+		 * modified since the last checkpoint.
+		 */
+		private void evictIfFull() {
+			if (size() > cacheSize) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("State cache is full for {}, evicting {} elements.", kvStateId, evictionSize);
+				}
+				try {
+					int numEvicted = 0;
+
+					Iterator<Entry<Tuple2<K, N>, Optional<V>>> entryIterator = entrySet().iterator();
+					while (numEvicted++ < evictionSize && entryIterator.hasNext()) {
+
+						Entry<Tuple2<K, N>, Optional<V>> next = entryIterator.next();
+
+						// We only need to write to the database if modified
+						if (modified.remove(next.getKey()) != null) {
+							batchInsert.add(next, nextTs);
+						}
+
+						entryIterator.remove();
+					}
+
+					batchInsert.flush(nextTs);
+
+				} catch (IOException e) {
+					// We need to re-throw this exception to conform to the map
+					// interface, we will catch this when we call the the
+					// put/get
+					throw new RuntimeException(e);
+				}
+			}
+		}
+
+		@Override
+		public void putAll(Map<? extends Tuple2<K, N>, ? extends Optional<V>> m) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void clear() {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	/**
+	 * Object for handling inserts to the database by batching them together
+	 * partitioned on the sharding key. The batches are written to the database
+	 * when they are full or when the inserter is flushed.
+	 *
+	 */
+	private class BatchInserter {
+
+		// Map from shard index to the kv pairs to be inserted
+		// Map<Integer, List<Tuple2<byte[], byte[]>>> inserts = new HashMap<>();
+
+		List<Tuple2<byte[], byte[]>>[] inserts;
+
+		@SuppressWarnings("unchecked")
+		public BatchInserter(int numShards) {
+			inserts = new List[numShards];
+			for (int i = 0; i < numShards; i++) {
+				inserts[i] = new ArrayList<>();
+			}
+		}
+
+		public void add(Entry<Tuple2<K, N>, Optional<V>> next, long timestamp) throws IOException {
+
+			K key = next.getKey().f0;
+			N namespace = next.getKey().f1;
+			V value = next.getValue().orNull();
+
+			// Get the current partition if present or initialize empty list
+			int shardIndex = connections.getShardIndex(key);
+
+			List<Tuple2<byte[], byte[]>> insertPartition = inserts[shardIndex];
+
+			// Add the k-v pair to the partition
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+			keySerializer.serialize(key, out);
+			namespaceSerializer.serialize(namespace, out);
+			out.close();
+
+			byte[] kn = baos.toByteArray();
+			byte[] v = value != null ? InstantiationUtil.serializeToByteArray(valueSerializer, value) : null;
+			insertPartition.add(Tuple2.of(kn, v));
+
+			// If partition is full write to the database and clear
+			if (insertPartition.size() == maxInsertBatchSize) {
+				dbAdapter.insertBatch(kvStateId, conf,
+						connections.getForIndex(shardIndex),
+						insertStatements.getForIndex(shardIndex),
+						timestamp, insertPartition);
+
+				insertPartition.clear();
+			}
+		}
+
+		public void flush(long timestamp) throws IOException {
+			// We flush all non-empty partitions
+			for (int i = 0; i < inserts.length; i++) {
+				List<Tuple2<byte[], byte[]>> insertPartition = inserts[i];
+				if (!insertPartition.isEmpty()) {
+					dbAdapter.insertBatch(kvStateId, conf, connections.getForIndex(i),
+							insertStatements.getForIndex(i), timestamp, insertPartition);
+					insertPartition.clear();
+				}
+			}
+
+		}
+	}
+
+	private class Compactor implements Runnable {
+
+		private long upperBound;
+
+		public Compactor(long upperBound) {
+			this.upperBound = upperBound;
+		}
+
+		@Override
+		public void run() {
+			// We create new database connections to make sure we don't
+			// interfere with the checkpointing (connections are not thread
+			// safe)
+			try (ShardedConnection sc = conf.createShardedConnection()) {
+				for (final Connection c : sc.connections()) {
+					SQLRetrier.retry(new Callable<Void>() {
+						@Override
+						public Void call() throws Exception {
+							dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, upperBound);
+							return null;
+						}
+					}, numSqlRetries, sqlRetrySleep);
+				}
+				if (LOG.isInfoEnabled()) {
+					LOG.info("State succesfully compacted for {} between {} and {}.", kvStateId,
+							lastCompactedTs,
+							upperBound);
+				}
+				lastCompactedTs = upperBound;
+			} catch (SQLException | IOException e) {
+				LOG.warn("State compaction failed due: {}", e);
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
index 9eb3cd5..cf2b5be 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -195,7 +195,7 @@ public class MySqlAdapter implements DbAdapter {
 	 */
 	protected static void validateStateId(String name) {
 		if (!name.matches("[a-zA-Z0-9_]+")) {
-			throw new RuntimeException("State name contains invalid characters.");
+			throw new RuntimeException("State name contains invalid characters: " + name);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 155ced8..34adf75 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -39,10 +33,20 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.io.FileUtils;
 import org.apache.derby.drda.NetworkServerControl;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.execution.Environment;
@@ -58,6 +62,9 @@ import org.junit.Test;
 
 import com.google.common.base.Optional;
 
+import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
+
 public class DbStateBackendTest {
 
 	private static NetworkServerControl server;
@@ -129,7 +136,7 @@ public class DbStateBackendTest {
 		assertFalse(backend.isInitialized());
 
 		Environment env = new DummyEnvironment("test", 1, 0);
-		backend.initializeForJob(env);
+		backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE);
 
 		assertNotNull(backend.getConnections());
 		assertTrue(
@@ -148,7 +155,7 @@ public class DbStateBackendTest {
 		Environment env = new DummyEnvironment("test", 1, 0);
 		DbStateBackend backend = new DbStateBackend(conf);
 
-		backend.initializeForJob(env);
+		backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE);
 
 		String state1 = "dummy state";
 		String state2 = "row row row your boat";
@@ -191,16 +198,20 @@ public class DbStateBackendTest {
 
 			Environment env = new DummyEnvironment("test", 2, 0);
 
-			backend.initializeForJob(env);
+			backend.initializeForJob(env, "dummy_test_kv", IntSerializer.INSTANCE);
+
+			ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE,
+				new ValueStateDescriptor<>("state1", null, StringSerializer.INSTANCE));
 
-			LazyDbKvState<Integer, String> kv = backend.createKvState("state1_1", "state1", IntSerializer.INSTANCE,
-					StringSerializer.INSTANCE, null);
+			LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state;
 
-			String tableName = "state1_1_" + env.getApplicationID().toShortString();
+			String tableName = "dummy_test_kv_state1";
 			assertTrue(isTableCreated(backend.getConnections().getFirst(), tableName));
 
 			assertEquals(0, kv.size());
 
+			kv.setCurrentNamespace(1);
+
 			// some modifications to the state
 			kv.setCurrentKey(1);
 			assertNull(kv.value());
@@ -225,7 +236,7 @@ public class DbStateBackendTest {
 			kv.update("u3");
 
 			// draw another snapshot
-			KvStateSnapshot<Integer, String, DbStateBackend> snapshot2 = kv.snapshot(682375462379L,
+			KvStateSnapshot<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> snapshot2 = kv.snapshot(682375462379L,
 					200);
 
 			// validate the original state
@@ -238,16 +249,23 @@ public class DbStateBackendTest {
 			assertEquals("u3", kv.value());
 
 			// restore the first snapshot and validate it
-			KvState<Integer, String, DbStateBackend> restored2 = snapshot2.restoreState(backend, IntSerializer.INSTANCE,
-					StringSerializer.INSTANCE, null, getClass().getClassLoader(), 6823754623710L);
+			KvState<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> restored2 = snapshot2.restoreState(
+				backend,
+				IntSerializer.INSTANCE,
+				getClass().getClassLoader(),
+				6823754623710L);
+
+			restored2.setCurrentNamespace(1);
+
+			@SuppressWarnings("unchecked")
+			ValueState<String> restoredState2 = (ValueState<String>) restored2;
 
-			assertEquals(0, restored2.size());
 			restored2.setCurrentKey(1);
-			assertEquals("u1", restored2.value());
+			assertEquals("u1", restoredState2.value());
 			restored2.setCurrentKey(2);
-			assertEquals("u2", restored2.value());
+			assertEquals("u2", restoredState2.value());
 			restored2.setCurrentKey(3);
-			assertEquals("u3", restored2.value());
+			assertEquals("u3", restoredState2.value());
 
 			backend.close();
 		} finally {
@@ -256,6 +274,173 @@ public class DbStateBackendTest {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked,rawtypes")
+	public void testListState() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend fileBackend = new FsStateBackend(localFileUri(tempDir));
+
+			DbStateBackend backend = new DbStateBackend(conf, fileBackend);
+
+			Environment env = new DummyEnvironment("test", 2, 0);
+
+			backend.initializeForJob(env, "dummy_test_kv_list", IntSerializer.INSTANCE);
+
+			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE);
+			ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+
+			@SuppressWarnings("unchecked")
+			KvState<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> kv =
+				(KvState<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend>) state;
+
+			Joiner joiner = Joiner.on(",");
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertEquals("", joiner.join(state.get()));
+			state.add("1");
+			kv.setCurrentKey(2);
+			assertEquals("", joiner.join(state.get()));
+			state.add("2");
+			kv.setCurrentKey(1);
+			assertEquals("1", joiner.join(state.get()));
+
+			// draw a snapshot
+			KvStateSnapshot<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> snapshot1 =
+				kv.snapshot(682375462378L, 2);
+
+			// make some more modifications
+			kv.setCurrentKey(1);
+			state.add("u1");
+			kv.setCurrentKey(2);
+			state.add("u2");
+			kv.setCurrentKey(3);
+			state.add("u3");
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> snapshot2 =
+				kv.snapshot(682375462379L, 4);
+
+			// validate the original state
+			kv.setCurrentKey(1);
+			assertEquals("1,u1", joiner.join(state.get()));
+			kv.setCurrentKey(2);
+			assertEquals("2,u2", joiner.join(state.get()));
+			kv.setCurrentKey(3);
+			assertEquals("u3", joiner.join(state.get()));
+
+			kv.dispose();
+
+			// restore the second snapshot and validate it
+			KvState<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> restored2 = snapshot2.restoreState(
+				backend,
+				IntSerializer.INSTANCE,
+				this.getClass().getClassLoader(), 20);
+
+			@SuppressWarnings("unchecked")
+			ListState<String> restored2State = (ListState<String>) restored2;
+
+			restored2.setCurrentKey(1);
+			assertEquals("1,u1", joiner.join(restored2State.get()));
+			restored2.setCurrentKey(2);
+			assertEquals("2,u2", joiner.join(restored2State.get()));
+			restored2.setCurrentKey(3);
+			assertEquals("u3", joiner.join(restored2State.get()));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	@SuppressWarnings("unchecked,rawtypes")
+	public void testReducingState() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend fileBackend = new FsStateBackend(localFileUri(tempDir));
+
+			DbStateBackend backend = new DbStateBackend(conf, fileBackend);
+
+			Environment env = new DummyEnvironment("test", 2, 0);
+
+			backend.initializeForJob(env, "dummy_test_kv_reduce", IntSerializer.INSTANCE);
+
+			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id",
+				new ReduceFunction<String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public String reduce(String value1, String value2) throws Exception {
+						return value1 + "," + value2;
+					}
+				},
+				StringSerializer.INSTANCE);
+			ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId);
+
+			@SuppressWarnings("unchecked")
+			KvState<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> kv =
+				(KvState<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend>) state;
+
+			Joiner joiner = Joiner.on(",");
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertEquals(null, state.get());
+			state.add("1");
+			kv.setCurrentKey(2);
+			assertEquals(null, state.get());
+			state.add("2");
+			kv.setCurrentKey(1);
+			assertEquals("1", state.get());
+
+			// draw a snapshot
+			KvStateSnapshot<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> snapshot1 =
+				kv.snapshot(682375462378L, 2);
+
+			// make some more modifications
+			kv.setCurrentKey(1);
+			state.add("u1");
+			kv.setCurrentKey(2);
+			state.add("u2");
+			kv.setCurrentKey(3);
+			state.add("u3");
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> snapshot2 =
+				kv.snapshot(682375462379L, 4);
+
+			// validate the original state
+			kv.setCurrentKey(1);
+			assertEquals("1,u1", state.get());
+			kv.setCurrentKey(2);
+			assertEquals("2,u2", state.get());
+			kv.setCurrentKey(3);
+			assertEquals("u3", state.get());
+
+			kv.dispose();
+
+			// restore the second snapshot and validate it
+			KvState<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> restored2 = snapshot2.restoreState(
+				backend,
+				IntSerializer.INSTANCE,
+				this.getClass().getClassLoader(), 20);
+
+			@SuppressWarnings("unchecked")
+			ReducingState<String> restored2State = (ReducingState<String>) restored2;
+
+			restored2.setCurrentKey(1);
+			assertEquals("1,u1", restored2State.get());
+			restored2.setCurrentKey(2);
+			assertEquals("2,u2", restored2State.get());
+			restored2.setCurrentKey(3);
+			assertEquals("u3", restored2State.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testCompaction() throws Exception {
 		DbBackendConfig conf = new DbBackendConfig("flink", "flink", url1);
 		MockAdapter adapter = new MockAdapter();
@@ -266,13 +451,17 @@ public class DbStateBackendTest {
 		DbStateBackend backend2 = new DbStateBackend(conf);
 		DbStateBackend backend3 = new DbStateBackend(conf);
 
-		backend1.initializeForJob(new DummyEnvironment("test", 3, 0));
-		backend2.initializeForJob(new DummyEnvironment("test", 3, 1));
-		backend3.initializeForJob(new DummyEnvironment("test", 3, 2));
+		backend1.initializeForJob(new DummyEnvironment("test", 3, 0), "dummy_1", StringSerializer.INSTANCE);
+		backend2.initializeForJob(new DummyEnvironment("test", 3, 1), "dummy_2", StringSerializer.INSTANCE);
+		backend3.initializeForJob(new DummyEnvironment("test", 3, 2), "dummy_3", StringSerializer.INSTANCE);
 
-		LazyDbKvState<?, ?> s1 = backend1.createKvState("a_1", "a", null, null, null);
-		LazyDbKvState<?, ?> s2 = backend2.createKvState("a_1", "a", null, null, null);
-		LazyDbKvState<?, ?> s3 = backend3.createKvState("a_1", "a", null, null, null);
+		ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a1", null, StringSerializer.INSTANCE));
+		ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a2", null, StringSerializer.INSTANCE));
+		ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, new ValueStateDescriptor<>("a3", null, StringSerializer.INSTANCE));
+
+		LazyDbValueState<?, ?, ?> s1 = (LazyDbValueState<?, ?, ?>) s1State;
+		LazyDbValueState<?, ?, ?> s2 = (LazyDbValueState<?, ?, ?>) s2State;
+		LazyDbValueState<?, ?, ?> s3 = (LazyDbValueState<?, ?, ?>) s3State;
 
 		assertTrue(s1.isCompactor());
 		assertFalse(s2.isCompactor());
@@ -324,23 +513,27 @@ public class DbStateBackendTest {
 
 		Environment env = new DummyEnvironment("test", 2, 0);
 
-		String tableName = "state1_1_" + env.getApplicationID().toShortString();
+		String tableName = "dummy_test_caching_state1";
 		assertFalse(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
 		assertFalse(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
 
-		backend.initializeForJob(env);
+		backend.initializeForJob(env, "dummy_test_caching", IntSerializer.INSTANCE);
+
+		ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE,
+			new ValueStateDescriptor<>("state1", "a", StringSerializer.INSTANCE));
 
-		LazyDbKvState<Integer, String> kv = backend.createKvState("state1_1", "state1", IntSerializer.INSTANCE,
-				StringSerializer.INSTANCE, "a");
+		LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state;
 
 		assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
 		assertTrue(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
 
-		Map<Integer, Optional<String>> cache = kv.getStateCache();
-		Map<Integer, Optional<String>> modified = kv.getModified();
+		Map<Tuple2<Integer, Integer>, Optional<String>> cache = kv.getStateCache();
+		Map<Tuple2<Integer, Integer>, Optional<String>> modified = kv.getModified();
 
 		assertEquals(0, kv.size());
 
+		kv.setCurrentNamespace(1);
+
 		// some modifications to the state
 		kv.setCurrentKey(1);
 		assertEquals("a", kv.value());
@@ -360,24 +553,24 @@ public class DbStateBackendTest {
 		kv.update("3");
 		assertEquals("3", kv.value());
 
-		assertTrue(modified.containsKey(1));
-		assertTrue(modified.containsKey(2));
-		assertTrue(modified.containsKey(3));
+		assertTrue(modified.containsKey(Tuple2.of(1, 1)));
+		assertTrue(modified.containsKey(Tuple2.of(2, 1)));
+		assertTrue(modified.containsKey(Tuple2.of(3, 1)));
 
 		// 1,2 should be evicted as the cache filled
 		kv.setCurrentKey(4);
 		kv.update("4");
 		assertEquals("4", kv.value());
 
-		assertFalse(modified.containsKey(1));
-		assertFalse(modified.containsKey(2));
-		assertTrue(modified.containsKey(3));
-		assertTrue(modified.containsKey(4));
+		assertFalse(modified.containsKey(Tuple2.of(1, 1)));
+		assertFalse(modified.containsKey(Tuple2.of(2, 1)));
+		assertTrue(modified.containsKey(Tuple2.of(3, 1)));
+		assertTrue(modified.containsKey(Tuple2.of(4, 1)));
 
-		assertEquals(Optional.of("3"), cache.get(3));
-		assertEquals(Optional.of("4"), cache.get(4));
-		assertFalse(cache.containsKey(1));
-		assertFalse(cache.containsKey(2));
+		assertEquals(Optional.of("3"), cache.get(Tuple2.of(3, 1)));
+		assertEquals(Optional.of("4"), cache.get(Tuple2.of(4, 1)));
+		assertFalse(cache.containsKey(Tuple2.of(1, 1)));
+		assertFalse(cache.containsKey(Tuple2.of(2, 1)));
 
 		// draw a snapshot
 		kv.snapshot(682375462378L, 100);
@@ -390,19 +583,19 @@ public class DbStateBackendTest {
 		kv.update(null);
 		assertEquals("a", kv.value());
 
-		assertTrue(modified.containsKey(2));
+		assertTrue(modified.containsKey(Tuple2.of(2, 1)));
 		assertEquals(1, modified.size());
 
-		assertEquals(Optional.of("3"), cache.get(3));
-		assertEquals(Optional.of("4"), cache.get(4));
-		assertEquals(Optional.absent(), cache.get(2));
-		assertFalse(cache.containsKey(1));
+		assertEquals(Optional.of("3"), cache.get(Tuple2.of(3, 1)));
+		assertEquals(Optional.of("4"), cache.get(Tuple2.of(4, 1)));
+		assertEquals(Optional.absent(), cache.get(Tuple2.of(2, 1)));
+		assertFalse(cache.containsKey(Tuple2.of(1, 1)));
 
-		assertTrue(modified.containsKey(2));
-		assertFalse(modified.containsKey(3));
-		assertFalse(modified.containsKey(4));
-		assertTrue(cache.containsKey(3));
-		assertTrue(cache.containsKey(4));
+		assertTrue(modified.containsKey(Tuple2.of(2, 1)));
+		assertFalse(modified.containsKey(Tuple2.of(3, 1)));
+		assertFalse(modified.containsKey(Tuple2.of(4, 1)));
+		assertTrue(cache.containsKey(Tuple2.of(3, 1)));
+		assertTrue(cache.containsKey(Tuple2.of(4, 1)));
 
 		// clear cache from initial keys
 
@@ -413,14 +606,14 @@ public class DbStateBackendTest {
 		kv.setCurrentKey(7);
 		kv.value();
 
-		assertFalse(modified.containsKey(5));
-		assertTrue(modified.containsKey(6));
-		assertTrue(modified.containsKey(7));
+		assertFalse(modified.containsKey(Tuple2.of(5, 1)));
+		assertTrue(modified.containsKey(Tuple2.of(6, 1)));
+		assertTrue(modified.containsKey(Tuple2.of(7, 1)));
 
-		assertFalse(cache.containsKey(1));
-		assertFalse(cache.containsKey(2));
-		assertFalse(cache.containsKey(3));
-		assertFalse(cache.containsKey(4));
+		assertFalse(cache.containsKey(Tuple2.of(1, 1)));
+		assertFalse(cache.containsKey(Tuple2.of(2, 1)));
+		assertFalse(cache.containsKey(Tuple2.of(3, 1)));
+		assertFalse(cache.containsKey(Tuple2.of(4, 1)));
 
 		kv.setCurrentKey(2);
 		assertEquals("a", kv.value());
@@ -428,7 +621,8 @@ public class DbStateBackendTest {
 		long checkpointTs = System.currentTimeMillis();
 
 		// Draw a snapshot that we will restore later
-		KvStateSnapshot<Integer, String, DbStateBackend> snapshot1 = kv.snapshot(682375462379L, checkpointTs);
+		KvStateSnapshot<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> snapshot1 = kv.snapshot(682375462379L, checkpointTs);
+
 		assertTrue(modified.isEmpty());
 
 		// Do some updates then draw another snapshot (imitate a partial
@@ -448,17 +642,35 @@ public class DbStateBackendTest {
 
 		// restore the second snapshot and validate it (we set a new default
 		// value here to make sure that the default wasn't written)
-		KvState<Integer, String, DbStateBackend> restored = snapshot1.restoreState(backend, IntSerializer.INSTANCE,
-				StringSerializer.INSTANCE, "b", getClass().getClassLoader(), 6823754623711L);
+		KvState<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> restored = snapshot1.restoreState(
+			backend,
+			IntSerializer.INSTANCE,
+			getClass().getClassLoader(),
+			6823754623711L);
+
+		LazyDbValueState<Integer, Integer, String> lazyRestored = (LazyDbValueState<Integer, Integer, String>) restored;
+
+		cache = lazyRestored.getStateCache();
+		modified = lazyRestored.getModified();
+
+		restored.setCurrentNamespace(1);
+
+		@SuppressWarnings("unchecked")
+		ValueState<String> restoredState = (ValueState<String>) restored;
 
 		restored.setCurrentKey(1);
-		assertEquals("b", restored.value());
+
+		assertEquals("a", restoredState.value());
+		// make sure that we got the default and not some value from the db
+		assertEquals(cache.get(Tuple2.of(1, 1)), Optional.<String>absent());
 		restored.setCurrentKey(2);
-		assertEquals("b", restored.value());
+		assertEquals("a", restoredState.value());
+		// make sure that we got the default and not some value from the db
+		assertEquals(cache.get(Tuple2.of(2, 1)), Optional.<String>absent());
 		restored.setCurrentKey(3);
-		assertEquals("3", restored.value());
+		assertEquals("3", restoredState.value());
 		restored.setCurrentKey(4);
-		assertEquals("4", restored.value());
+		assertEquals("4", restoredState.value());
 
 		backend.close();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
index 02c1a3e..53d8d50 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -69,7 +69,6 @@ public class DerbyAdapter extends MySqlAdapter {
 	 */
 	@Override
 	public void createKVStateTable(String stateId, Connection con) throws SQLException {
-
 		validateStateId(stateId);
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties b/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
index 0b686e5..45a18ec 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
+++ b/flink-contrib/flink-streaming-contrib/src/test/resources/log4j-test.properties
@@ -17,11 +17,11 @@
 ################################################################################
 
 # Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
+log4j.rootLogger=ON, A1
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n


Mime
View raw message