flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [4/8] flink git commit: [FLINK-2924] [streaming] Out-of-core state backend for JDBC databases
Date Tue, 24 Nov 2015 09:45:06 GMT
[FLINK-2924] [streaming] Out-of-core state backend for JDBC databases


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75a7c4b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75a7c4b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75a7c4b1

Branch: refs/heads/master
Commit: 75a7c4b109be68c02b1c7733310f4f9000ad8bf6
Parents: ad6f826
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Nov 4 11:12:25 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Tue Nov 24 09:28:39 2015 +0100

----------------------------------------------------------------------
 flink-contrib/flink-streaming-contrib/pom.xml   |  24 +
 .../contrib/streaming/state/DbAdapter.java      | 170 +++++++
 .../streaming/state/DbBackendConfig.java        | 439 +++++++++++++++++
 .../contrib/streaming/state/DbStateBackend.java | 251 ++++++++++
 .../contrib/streaming/state/DbStateHandle.java  |  89 ++++
 .../contrib/streaming/state/LazyDbKvState.java  | 488 +++++++++++++++++++
 .../contrib/streaming/state/MySqlAdapter.java   | 242 +++++++++
 .../contrib/streaming/state/SQLRetrier.java     | 176 +++++++
 .../state/DBStateCheckpointingTest.java         | 256 ++++++++++
 .../streaming/state/DbStateBackendTest.java     | 465 ++++++++++++++++++
 .../contrib/streaming/state/DerbyAdapter.java   | 175 +++++++
 .../checkpoint/CheckpointCoordinator.java       |   8 +-
 .../runtime/checkpoint/CheckpointIDCounter.java |   2 +
 .../StandaloneCheckpointIDCounter.java          |   5 +
 .../ZooKeeperCheckpointIDCounter.java           |  11 +
 .../deployment/TaskDeploymentDescriptor.java    |  13 +-
 .../flink/runtime/executiongraph/Execution.java |   7 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   1 +
 .../runtime/executiongraph/ExecutionVertex.java |   5 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |   3 +-
 .../flink/runtime/state/KvStateSnapshot.java    |   3 +-
 .../apache/flink/runtime/state/StateUtils.java  |   6 +-
 .../state/filesystem/FsHeapKvStateSnapshot.java |   3 +-
 .../state/memory/MemoryHeapKvStateSnapshot.java |   3 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   6 +-
 .../checkpoint/CheckpointStateRestoreTest.java  |  48 +-
 .../runtime/state/FileStateBackendTest.java     |  10 +-
 .../runtime/state/MemoryStateBackendTest.java   |  10 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../api/operators/AbstractStreamOperator.java   |  21 +-
 .../operators/AbstractUdfStreamOperator.java    |   4 +-
 .../streaming/api/operators/StreamOperator.java |   4 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   4 +-
 .../windowing/NonKeyedWindowOperator.java       |   4 +-
 .../operators/windowing/WindowOperator.java     |   4 +-
 .../streaming/runtime/tasks/StreamTask.java     |  13 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   4 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   4 +-
 38 files changed, 2910 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml
index 68e65f6..22b11b2 100644
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ b/flink-contrib/flink-streaming-contrib/pom.xml
@@ -53,6 +53,30 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derbyclient</artifactId>
+			<version>10.12.1.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derbynet</artifactId>
+			<version>10.12.1.1</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
new file mode 100644
index 0000000..7383bae
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+public interface DbAdapter {
+
+	/**
+	 * Initialize tables for storing non-partitioned checkpoints for the given
+	 * job id and database connection.
+	 * 
+	 */
+	void createCheckpointsTable(String jobId, Connection con) throws SQLException;
+
+	/**
+	 * Checkpoints will be inserted in the database using prepared statements.
+	 * This methods should prepare and return the statement that will be used
+	 * later to insert using the given connection.
+	 * 
+	 */
+	PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException;
+
+	/**
+	 * Set the {@link PreparedStatement} parameters for the statement returned
+	 * by {@link #prepareCheckpointInsert(String, Connection)}.
+	 * 
+	 * @param jobId
+	 *            Id of the current job.
+	 * @param insertStatement
+	 *            Statement returned by
+	 *            {@link #prepareCheckpointInsert(String, Connection)}.
+	 * @param checkpointId
+	 *            Global checkpoint id.
+	 * @param timestamp
+	 *            Global checkpoint timestamp.
+	 * @param handleId
+	 *            Unique id assigned to this state checkpoint (should be primary
+	 *            key).
+	 * @param checkpoint
+	 *            The serialized checkpoint.
+	 * @throws SQLException
+	 */
+	void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
+			long timestamp, long handleId, byte[] checkpoint) throws SQLException;
+
+	/**
+	 * Retrieve the serialized checkpoint data from the database.
+	 * 
+	 * @param jobId
+	 *            Id of the current job.
+	 * @param con
+	 *            Database connection
+	 * @param checkpointId
+	 *            Global checkpoint id.
+	 * @param checkpointTs
+	 *            Global checkpoint timestamp.
+	 * @param handleId
+	 *            Unique id assigned to this state checkpoint (should be primary
+	 *            key).
+	 * @return The byte[] corresponding to the checkpoint or null if missing.
+	 * @throws SQLException
+	 */
+	byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
+			throws SQLException;
+
+	/**
+	 * Remove the given checkpoint from the database.
+	 * 
+	 * @param jobId
+	 *            Id of the current job.
+	 * @param con
+	 *            Database connection
+	 * @param checkpointId
+	 *            Global checkpoint id.
+	 * @param checkpointTs
+	 *            Global checkpoint timestamp.
+	 * @param handleId
+	 *            Unique id assigned to this state checkpoint (should be primary
+	 *            key).
+	 * @return The byte[] corresponding to the checkpoint or null if missing.
+	 * @throws SQLException
+	 */
+	void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
+			throws SQLException;
+
+	/**
+	 * Remove all states for the given JobId, by for instance dropping the
+	 * entire table.
+	 * 
+	 * @throws SQLException
+	 */
+	void disposeAllStateForJob(String jobId, Connection con) throws SQLException;
+
+	/**
+	 * Initialize the necessary tables for the given stateId. The state id
+	 * consist of the JobId+OperatorId+StateName.
+	 * 
+	 */
+	void createKVStateTable(String stateId, Connection con) throws SQLException;
+
+	/**
+	 * Prepare the the statement that will be used to insert key-value pairs in
+	 * the database.
+	 * 
+	 */
+	PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException;
+
+	/**
+	 * Prepare the statement that will be used to lookup keys from the database.
+	 * Keys and values are assumed to be byte arrays.
+	 * 
+	 */
+	PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException;
+
+	/**
+	 * Retrieve the latest value from the database for a given key and
+	 * checkpointId.
+	 * 
+	 * @param stateId
+	 *            Unique identifier of the kvstate (usually the table name).
+	 * @param lookupStatement
+	 *            The statement returned by
+	 *            {@link #prepareKeyLookup(String, Connection)}.
+	 * @param key
+	 *            The key to lookup.
+	 * @return The latest valid value for the key.
+	 * @throws SQLException
+	 */
+	byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupId)
+			throws SQLException;
+
+	/**
+	 * Clean up states between the current and next checkpoint id. Everything
+	 * with larger than current and smaller than next should be removed.
+	 * 
+	 */
+	void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId,
+			long nextId) throws SQLException;
+
+	/**
+	 * Insert a list of Key-Value pairs into the database. The suggested
+	 * approach is to use idempotent inserts(updates) as 1 batch operation.
+	 * 
+	 */
+	void insertBatch(String stateId, DbBackendConfig conf, Connection con, PreparedStatement insertStatement,
+			long checkpointId, List<Tuple2<byte[], byte[]>> toInsert) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
new file mode 100644
index 0000000..f7fe5dc
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ * Configuration object for {@link DbStateBackend}, containing information to
+ * shard and connect to the databases that will store the state checkpoints.
+ *
+ */
+public class DbBackendConfig implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// Database connection properties
+	private final String userName;
+	private final String userPassword;
+	private final List<String> shardUrls;
+
+	// JDBC Driver + DbAdapter information
+	private Class<? extends MySqlAdapter> dbAdapterClass = MySqlAdapter.class;
+	private String JDBCDriver = null;
+
+	private int maxNumberOfSqlRetries = 5;
+	private int sleepBetweenSqlRetries = 100;
+
+	// KvState properties
+	private int kvStateCacheSize = 10000;
+	private int maxKvInsertBatchSize = 1000;
+	private float maxKvEvictFraction = 0.1f;
+	private int kvStateCompactionFreq = -1;
+
+	/**
+	 * Creates a new sharded database state backend configuration with the given
+	 * parameters and default {@link MySqlAdapter}.
+	 * 
+	 * @param dbUserName
+	 *            The username used to connect to the database at the given url.
+	 * @param dbUserPassword
+	 *            The password used to connect to the database at the given url
+	 *            and username.
+	 * @param dbShardUrls
+	 *            The list of JDBC urls of the databases that will be used as
+	 *            shards for the state backend. Sharding of the state will
+	 *            happen based on the subtask index of the given task.
+	 */
+	public DbBackendConfig(String dbUserName, String dbUserPassword, List<String> dbShardUrls) {
+		this.userName = dbUserName;
+		this.userPassword = dbUserPassword;
+		this.shardUrls = dbShardUrls;
+	}
+
+	/**
+	 * Creates a new database state backend configuration with the given
+	 * parameters and default {@link MySqlAdapter}.
+	 * 
+	 * @param dbUserName
+	 *            The username used to connect to the database at the given url.
+	 * @param dbUserPassword
+	 *            The password used to connect to the database at the given url
+	 *            and username.
+	 * @param dbUrl
+	 *            The JDBC url of the database for example
+	 *            "jdbc:mysql://localhost:3306/flinkdb".
+	 */
+	public DbBackendConfig(String dbUserName, String dbUserPassword, String dbUrl) {
+		this(dbUserName, dbUserPassword, Lists.newArrayList(dbUrl));
+	}
+
+	/**
+	 * The username used to connect to the database at the given urls.
+	 */
+	public String getUserName() {
+		return userName;
+	}
+
+	/**
+	 * The password used to connect to the database at the given url and
+	 * username.
+	 */
+	public String getUserPassword() {
+		return userPassword;
+	}
+
+	/**
+	 * Number of database shards defined.
+	 */
+	public int getNumberOfShards() {
+		return shardUrls.size();
+	}
+
+	/**
+	 * Database shard urls as provided in the constructor.
+	 * 
+	 */
+	public List<String> getShardUrls() {
+		return shardUrls;
+	}
+
+	/**
+	 * The url of the first shard.
+	 * 
+	 */
+	public String getUrl() {
+		return getShardUrl(0);
+	}
+
+	/**
+	 * The url of a specific shard.
+	 * 
+	 */
+	public String getShardUrl(int shardIndex) {
+		validateShardIndex(shardIndex);
+		return shardUrls.get(shardIndex);
+	}
+
+	/**
+	 * Get an instance of the {@link MySqlAdapter} that will be used to operate on
+	 * the database during checkpointing.
+	 * 
+	 * @return An instance of the class set in {@link #setDbAdapterClass(Class)}
+	 *         or a {@link MySqlAdapter} instance if a custom class was not set.
+	 */
+	public MySqlAdapter getDbAdapter() {
+		try {
+			return dbAdapterClass.newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * The class name that should be used to load the JDBC driver using
+	 * Class.forName(JDBCDriverClass).
+	 */
+	public String getJDBCDriver() {
+		return JDBCDriver;
+	}
+
+	/**
+	 * Set the class name that should be used to load the JDBC driver using
+	 * Class.forName(JDBCDriverClass).
+	 */
+	public void setJDBCDriver(String jDBCDriverClassName) {
+		JDBCDriver = jDBCDriverClassName;
+	}
+
+	/**
+	 * Get the Class that will be used to instantiate the {@link MySqlAdapter} for
+	 * the {@link #getDbAdapter()} method.
+	 * 
+	 */
+	public Class<? extends MySqlAdapter> getDbAdapterClass() {
+		return dbAdapterClass;
+	}
+
+	/**
+	 * Set the Class that will be used to instantiate the {@link MySqlAdapter} for
+	 * the {@link #getDbAdapter()} method. The class should have an empty
+	 * constructor.
+	 * 
+	 */
+	public void setDbAdapterClass(Class<? extends MySqlAdapter> dbAdapterClass) {
+		this.dbAdapterClass = dbAdapterClass;
+	}
+
+	/**
+	 * The maximum number of key-value pairs stored in one task instance's cache
+	 * before evicting to the underlying database.
+	 *
+	 */
+	public int getKvCacheSize() {
+		return kvStateCacheSize;
+	}
+
+	/**
+	 * Set the maximum number of key-value pairs stored in one task instance's
+	 * cache before evicting to the underlying database. When the cache is full
+	 * the N least recently used keys will be evicted to the database, where N =
+	 * maxKvEvictFraction*KvCacheSize.
+	 *
+	 */
+	public void setKvCacheSize(int size) {
+		kvStateCacheSize = size;
+	}
+
+	/**
+	 * The maximum number of key-value pairs inserted in the database as one
+	 * batch operation.
+	 */
+	public int getMaxKvInsertBatchSize() {
+		return maxKvInsertBatchSize;
+	}
+
+	/**
+	 * Set the maximum number of key-value pairs inserted in the database as one
+	 * batch operation.
+	 */
+	public void setMaxKvInsertBatchSize(int size) {
+		maxKvInsertBatchSize = size;
+	}
+
+	/**
+	 * Sets the maximum fraction of key-value states evicted from the cache if
+	 * the cache is full.
+	 */
+	public void setMaxKvCacheEvictFraction(float fraction) {
+		if (fraction > 1 || fraction <= 0) {
+			throw new RuntimeException("Must be a number between 0 and 1");
+		} else {
+			maxKvEvictFraction = fraction;
+		}
+	}
+
+	/**
+	 * The maximum fraction of key-value states evicted from the cache if the
+	 * cache is full.
+	 */
+	public float getMaxKvCacheEvictFraction() {
+		return maxKvEvictFraction;
+	}
+
+	/**
+	 * The number of elements that will be evicted when the cache is full.
+	 * 
+	 */
+	public int getNumElementsToEvict() {
+		return (int) Math.ceil(getKvCacheSize() * getMaxKvCacheEvictFraction());
+	}
+
+	/**
+	 * Sets how often will automatic compaction be performed on the database to
+	 * remove old overwritten state changes. The frequency is set in terms of
+	 * number of successful checkpoints between two compactions and should take
+	 * the state size and checkpoint frequency into account.
+	 * <p>
+	 * By default automatic compaction is turned off.
+	 */
+	public void setKvStateCompactionFrequency(int compactEvery) {
+		this.kvStateCompactionFreq = compactEvery;
+	}
+
+	/**
+	 * Sets how often will automatic compaction be performed on the database to
+	 * remove old overwritten state changes. The frequency is set in terms of
+	 * number of successful checkpoints between two compactions and should take
+	 * the state size and checkpoint frequency into account.
+	 * <p>
+	 * By default automatic compaction is turned off.
+	 */
+	public int getKvStateCompactionFrequency() {
+		return kvStateCompactionFreq;
+	}
+
+	/**
+	 * The number of times each SQL command will be retried on failure.
+	 */
+	public int getMaxNumberOfSqlRetries() {
+		return maxNumberOfSqlRetries;
+	}
+
+	/**
+	 * Sets the number of times each SQL command will be retried on failure.
+	 */
+	public void setMaxNumberOfSqlRetries(int maxNumberOfSqlRetries) {
+		this.maxNumberOfSqlRetries = maxNumberOfSqlRetries;
+	}
+
+	/**
+	 * The number of milliseconds slept between two SQL retries. The actual
+	 * sleep time will be chosen randomly between 1 and the given time.
+	 * 
+	 */
+	public int getSleepBetweenSqlRetries() {
+		return sleepBetweenSqlRetries;
+	}
+
+	/**
+	 * Sets the number of milliseconds slept between two SQL retries. The actual
+	 * sleep time will be chosen randomly between 1 and the given time.
+	 * 
+	 */
+	public void setSleepBetweenSqlRetries(int sleepBetweenSqlRetries) {
+		this.sleepBetweenSqlRetries = sleepBetweenSqlRetries;
+	}
+
+	/**
+	 * Creates a new {@link Connection} using the set parameters for the first
+	 * shard.
+	 * 
+	 * @throws SQLException
+	 */
+	public Connection createConnection() throws SQLException {
+		return createConnection(0);
+	}
+
+	/**
+	 * Creates a new {@link Connection} using the set parameters for the given
+	 * shard index.
+	 * 
+	 * @throws SQLException
+	 */
+	public Connection createConnection(int shardIndex) throws SQLException {
+		validateShardIndex(shardIndex);
+		if (JDBCDriver != null) {
+			try {
+				Class.forName(JDBCDriver);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Could not load JDBC driver class", e);
+			}
+		}
+		return DriverManager.getConnection(getShardUrl(shardIndex), userName, userPassword);
+	}
+
+	/**
+	 * Creates a new {@link DbBackendConfig} with the selected shard as its only
+	 * shard.
+	 * 
+	 */
+	public DbBackendConfig createConfigForShard(int shardIndex) {
+		validateShardIndex(shardIndex);
+		DbBackendConfig c = new DbBackendConfig(userName, userPassword, shardUrls.get(shardIndex));
+		c.setJDBCDriver(JDBCDriver);
+		c.setDbAdapterClass(dbAdapterClass);
+		c.setKvCacheSize(kvStateCacheSize);
+		c.setMaxKvInsertBatchSize(maxKvInsertBatchSize);
+		return c;
+	}
+
+	private void validateShardIndex(int i) {
+		if (i < 0) {
+			throw new IllegalArgumentException("Index must be positive.");
+		} else if (getNumberOfShards() <= i) {
+			throw new IllegalArgumentException("Index must be less then the total number of shards.");
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		final int prime = 31;
+		int result = 1;
+		result = prime * result + ((JDBCDriver == null) ? 0 : JDBCDriver.hashCode());
+		result = prime * result + ((dbAdapterClass == null) ? 0 : dbAdapterClass.hashCode());
+		result = prime * result + kvStateCacheSize;
+		result = prime * result + Float.floatToIntBits(maxKvEvictFraction);
+		result = prime * result + maxKvInsertBatchSize;
+		result = prime * result + kvStateCompactionFreq;
+		result = prime * result + ((shardUrls == null) ? 0 : shardUrls.hashCode());
+		result = prime * result + ((userName == null) ? 0 : userName.hashCode());
+		result = prime * result + ((userPassword == null) ? 0 : userPassword.hashCode());
+		return result;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+		if (obj == null) {
+			return false;
+		}
+		if (getClass() != obj.getClass()) {
+			return false;
+		}
+		DbBackendConfig other = (DbBackendConfig) obj;
+		if (JDBCDriver == null) {
+			if (other.JDBCDriver != null) {
+				return false;
+			}
+		} else if (!JDBCDriver.equals(other.JDBCDriver)) {
+			return false;
+		}
+		if (dbAdapterClass == null) {
+			if (other.dbAdapterClass != null) {
+				return false;
+			}
+		} else if (!dbAdapterClass.equals(other.dbAdapterClass)) {
+			return false;
+		}
+		if (kvStateCacheSize != other.kvStateCacheSize) {
+			return false;
+		}
+		if (Float.floatToIntBits(maxKvEvictFraction) != Float.floatToIntBits(other.maxKvEvictFraction)) {
+			return false;
+		}
+		if (maxKvInsertBatchSize != other.maxKvInsertBatchSize) {
+			return false;
+		}
+		if (kvStateCompactionFreq != other.kvStateCompactionFreq) {
+			return false;
+		}
+		if (shardUrls == null) {
+			if (other.shardUrls != null) {
+				return false;
+			}
+		} else if (!shardUrls.equals(other.shardUrls)) {
+			return false;
+		}
+		if (userName == null) {
+			if (other.userName != null) {
+				return false;
+			}
+		} else if (!userName.equals(other.userName)) {
+			return false;
+		}
+		if (userPassword == null) {
+			if (other.userPassword != null) {
+				return false;
+			}
+		} else if (!userPassword.equals(other.userPassword)) {
+			return false;
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
new file mode 100644
index 0000000..899dbd5
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+/**
+ * {@link StateBackend} for storing checkpoints in JDBC supporting databases.
+ * Key-Value state is stored out-of-core and is lazily fetched using the
+ * {@link LazyDbKvState} implementation. A different backend can also be
+ * provided in the constructor to store the non-partitioned states. A common use
+ * case would be to store the key-value states in the database and store larger
+ * non-partitioned states on a distributed file system.
+ * <p>
+ * This backend implementation also allows the sharding of the checkpointed
+ * states among multiple database instances, which can be enabled by passing
+ * multiple database urls to the {@link DbBackendConfig} instance.
+ * <p>
+ * By default there are multiple tables created in the given databases: 1 table
+ * for non-partitioned checkpoints and 1 table for each key-value state in the
+ * streaming program.
+ * <p>
+ * To control table creation, insert/lookup operations and to provide
+ * compatibility for different SQL implementations, a custom {@link MySqlAdapter}
+ * can be supplied in the {@link DbBackendConfig}.
+ *
+ */
+public class DbStateBackend extends StateBackend<DbStateBackend> {
+
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
+
+	private Random rnd;
+
+	// ------------------------------------------------------
+
+	private Environment env;
+
+	// ------------------------------------------------------
+
+	private final DbBackendConfig dbConfig;
+	private final DbAdapter dbAdapter;
+
+	private Connection con;
+	private int shardIndex = 0;
+
+	private final int numSqlRetries;
+	private final int sqlRetrySleep;
+	
+	private PreparedStatement insertStatement;
+
+	// ------------------------------------------------------
+
+	// We allow to use a different backend for storing non-partitioned states
+	private StateBackend<?> nonPartitionedStateBackend = null;
+
+	// ------------------------------------------------------
+
+	/**
+	 * Create a new {@link DbStateBackend} using the provided
+	 * {@link DbBackendConfig} configuration.
+	 * 
+	 */
+	public DbStateBackend(DbBackendConfig backendConfig) {
+		this.dbConfig = backendConfig;
+		dbAdapter = backendConfig.getDbAdapter();
+		numSqlRetries = backendConfig.getMaxNumberOfSqlRetries();
+		sqlRetrySleep = backendConfig.getSleepBetweenSqlRetries();
+	}
+
+	/**
+	 * Create a new {@link DbStateBackend} using the provided
+	 * {@link DbBackendConfig} configuration and a different backend for storing
+	 * non-partitioned state snapshots.
+	 * 
+	 */
+	public DbStateBackend(DbBackendConfig backendConfig, StateBackend<?> backend) {
+		this(backendConfig);
+		this.nonPartitionedStateBackend = backend;
+	}
+
+	/**
+	 * Get the database connection maintained by the backend.
+	 */
+	public Connection getConnection() {
+		return con;
+	}
+
+	/**
+	 * Check whether the backend has been initialized.
+	 * 
+	 */
+	public boolean isInitialized() {
+		return con != null;
+	}
+	
+	public Environment getEnvironment() {
+		return env;
+	}
+	
+	public int getShardIndex() {
+		return shardIndex;
+	}
+
+	/**
+	 * Get the backend configuration object.
+	 */
+	public DbBackendConfig getConfiguration() {
+		return dbConfig;
+	}
+
+	@Override
+	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(final S state, final long checkpointID,
+			final long timestamp) throws Exception {
+
+		// If we set a different backend for non-partitioned checkpoints we use
+		// that otherwise write to the database.
+		if (nonPartitionedStateBackend == null) {
+			return retry(new Callable<DbStateHandle<S>>() {
+				public DbStateHandle<S> call() throws Exception {
+					// We create a unique long id for each handle, but we also
+					// store the checkpoint id and timestamp for bookkeeping
+					long handleId = rnd.nextLong();
+
+					dbAdapter.setCheckpointInsertParams(env.getJobID().toString(), insertStatement,
+							checkpointID, timestamp, handleId,
+							InstantiationUtil.serializeObject(state));
+
+					insertStatement.executeUpdate();
+
+					return new DbStateHandle<S>(env.getJobID().toString(), checkpointID, timestamp, handleId,
+							dbConfig.createConfigForShard(shardIndex));
+				}
+			}, numSqlRetries, sqlRetrySleep);
+		} else {
+			return nonPartitionedStateBackend.checkpointStateSerializable(state, checkpointID, timestamp);
+		}
+	}
+
+	@Override
+	public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp)
+			throws Exception {
+		if (nonPartitionedStateBackend == null) {
+			// We don't implement this functionality for the DbStateBackend as
+			// we cannot directly write a stream to the database anyways.
+			throw new UnsupportedOperationException("Use ceckpointStateSerializable instead.");
+		} else {
+			return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
+		}
+	}
+
+	@Override
+	public <K, V> LazyDbKvState<K, V> createKvState(int operatorId, String stateName,
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
+		return new LazyDbKvState<K, V>(
+				env.getJobID() + "_" + operatorId + "_" + stateName,
+				shardIndex == env.getIndexInSubtaskGroup(),
+				getConnection(),
+				getConfiguration(),
+				keySerializer,
+				valueSerializer,
+				defaultValue);
+	}
+
+	@Override
+	public void initializeForJob(final Environment env) throws Exception {
+		this.rnd = new Random();
+		this.env = env;
+
+		// If there are multiple shards provided in the config we partition the
+		// writes by subtask index
+		shardIndex = env.getIndexInSubtaskGroup() % dbConfig.getNumberOfShards();
+
+		con = dbConfig.createConnection(shardIndex);
+		// We want the most light-weight transaction isolation level as we don't
+		// have conflicting reads/writes. We just want to be able to roll back
+		// batch inserts for k-v snapshots. This requirement might be removed in
+		// the future.
+		con.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+
+		// If we have a different backend for non-partitioned states we
+		// initialize that, otherwise create tables for storing the checkpoints.
+		if (nonPartitionedStateBackend == null) {
+			insertStatement = retry(new Callable<PreparedStatement>() {
+				public PreparedStatement call() throws SQLException {
+					dbAdapter.createCheckpointsTable(env.getJobID().toString(), getConnection());
+					return dbAdapter.prepareCheckpointInsert(env.getJobID().toString(), getConnection());
+				}
+			}, numSqlRetries, sqlRetrySleep);
+		} else {
+			nonPartitionedStateBackend.initializeForJob(env);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Database state backend successfully initialized");
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		// We first close the statement/non-partitioned backend, then we close
+		// the database connection
+		try (Connection c = con) {
+			if (nonPartitionedStateBackend == null) {
+				insertStatement.close();
+			} else {
+				nonPartitionedStateBackend.close();
+			}
+		}
+	}
+
+	@Override
+	public void disposeAllStateForCurrentJob() throws Exception {
+		if (nonPartitionedStateBackend == null) {
+			dbAdapter.disposeAllStateForJob(env.getJobID().toString(), con);
+		} else {
+			nonPartitionedStateBackend.disposeAllStateForCurrentJob();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
new file mode 100644
index 0000000..93d6419
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.eclipse.jetty.util.log.Log;
+
+/**
+ * State handle implementation for storing checkpoints as byte arrays in
+ * databases using the {@link MySqlAdapter} defined in the {@link DbBackendConfig}.
+ * 
+ */
+public class DbStateHandle<S> implements Serializable, StateHandle<S> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final String jobId;
+	private final DbBackendConfig dbConfig;
+
+	private final long checkpointId;
+	private final long checkpointTs;
+
+	private final long handleId;
+
+	public DbStateHandle(String jobId, long checkpointId, long checkpointTs, long handleId, DbBackendConfig dbConfig) {
+		this.checkpointId = checkpointId;
+		this.handleId = handleId;
+		this.jobId = jobId;
+		this.dbConfig = dbConfig;
+		this.checkpointTs = checkpointTs;
+	}
+
+	protected byte[] getBytes() throws IOException {
+		return retry(new Callable<byte[]>() {
+			public byte[] call() throws Exception {
+				try (Connection con = dbConfig.createConnection()) {
+					return dbConfig.getDbAdapter().getCheckpoint(jobId, con, checkpointId, checkpointTs, handleId);
+				}
+			}
+		}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
+	}
+
+	@Override
+	public void discardState() {
+		try {
+			retry(new Callable<Boolean>() {
+				public Boolean call() throws Exception {
+					try (Connection con = dbConfig.createConnection()) {
+						dbConfig.getDbAdapter().deleteCheckpoint(jobId, con, checkpointId, checkpointTs, handleId);
+					}
+					return true;
+				}
+			}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
+		} catch (IOException e) {
+			// We don't want to fail the job here, but log the error.
+			if (Log.isDebugEnabled()) {
+				Log.debug("Could not discard state.");
+			}
+		}
+	}
+
+	@Override
+	public S getState(ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
+		return InstantiationUtil.deserializeObject(getBytes(), userCodeClassLoader);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
new file mode 100644
index 0000000..cbacbc4
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+
+import org.apache.derby.client.am.SqlException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * 
+ * Lazily fetched {@link KvState} using a SQL backend. Key-value pairs are
+ * cached on heap and are lazily retrieved on access.
+ * 
+ */
+public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>, CheckpointNotifier {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LazyDbKvState.class);
+
+	// ------------------------------------------------------
+
+	// Unique id for this state (jobID_operatorID_stateName)
+	private final String kvStateId;
+	private final boolean compact;
+
+	private K currentKey;
+	private final V defaultValue;
+
+	private final TypeSerializer<K> keySerializer;
+	private final TypeSerializer<V> valueSerializer;
+
+	// ------------------------------------------------------
+
+	// Max number of retries for failed database operations
+	private final int numSqlRetries;
+	// Sleep time between two retries
+	private final int sqlRetrySleep;
+	// Max number of key-value pairs inserted in one batch to the database
+	private final int maxInsertBatchSize;
+	// We will do database compaction every so many checkpoints
+	private final int compactEvery;
+
+	// Database properties
+	private final DbBackendConfig conf;
+	private final Connection con;
+	private final MySqlAdapter dbAdapter;
+	private final BatchInsert batchInsert;
+
+	// Statements for key-lookups and inserts as prepared by the dbAdapter
+	private PreparedStatement selectStatement;
+	private PreparedStatement insertStatement;
+
+	// ------------------------------------------------------
+
+	// LRU cache for the key-value states backed by the database
+	private final StateCache cache;
+
+	private long nextCheckpointId;
+
+	// ------------------------------------------------------
+
+	/**
+	 * Constructor to initialize the {@link LazyDbKvState} the first time the
+	 * job starts.
+	 */
+	public LazyDbKvState(String kvStateId, boolean compact, Connection con, DbBackendConfig conf,
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
+		this(kvStateId, compact, con, conf, keySerializer, valueSerializer, defaultValue, 1);
+	}
+
+	/**
+	 * Initialize the {@link LazyDbKvState} from a snapshot.
+	 */
+	public LazyDbKvState(String kvStateId, boolean compact, Connection con, final DbBackendConfig conf,
+			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, long nextId)
+					throws IOException {
+
+		this.kvStateId = kvStateId;
+		this.compact = compact;
+
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+		this.defaultValue = defaultValue;
+
+		this.maxInsertBatchSize = conf.getMaxKvInsertBatchSize();
+		this.conf = conf;
+		this.con = con;
+		this.dbAdapter = conf.getDbAdapter();
+		this.compactEvery = conf.getKvStateCompactionFrequency();
+		this.numSqlRetries = conf.getMaxNumberOfSqlRetries();
+		this.sqlRetrySleep = conf.getSleepBetweenSqlRetries();
+
+		this.nextCheckpointId = nextId;
+
+		this.cache = new StateCache(conf.getKvCacheSize(), conf.getNumElementsToEvict());
+
+		initDB(this.con);
+
+		batchInsert = new BatchInsert();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Lazy database kv-state ({}) successfully initialized", kvStateId);
+		}
+	}
+
+	@Override
+	public void setCurrentKey(K key) {
+		this.currentKey = key;
+	}
+
+	@Override
+	public void update(V value) throws IOException {
+		try {
+			cache.put(currentKey, Optional.fromNullable(value));
+		} catch (RuntimeException e) {
+			// We need to catch the RuntimeExceptions thrown in the StateCache
+			// methods here
+			throw new IOException(e);
+		}
+	}
+
+	@Override
+	public V value() throws IOException {
+		try {
+			// We get the value from the cache (which will automatically load it
+			// from the database if necessary). If null, we return a copy of the
+			// default value
+			V val = cache.get(currentKey).orNull();
+			return val != null ? val : copyDefault();
+		} catch (RuntimeException e) {
+			// We need to catch the RuntimeExceptions thrown in the StateCache
+			// methods here
+			throw new IOException(e);
+		}
+	}
+
+	@Override
+	public DbKvStateSnapshot<K, V> shapshot(long checkpointId, long timestamp) throws IOException {
+
+		// We insert the modified elements to the database with the current id
+		// then clear the modified states
+		for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
+			batchInsert.add(state, checkpointId);
+		}
+		batchInsert.flush(checkpointId);
+		cache.modified.clear();
+
+		// We increase the next checkpoint id
+		nextCheckpointId = checkpointId + 1;
+
+		return new DbKvStateSnapshot<K, V>(kvStateId, checkpointId);
+	}
+
+	/**
+	 * Returns the number of elements currently stored in the task's cache. Note
+	 * that the number of elements in the database is not counted here.
+	 */
+	@Override
+	public int size() {
+		return cache.size();
+	}
+
+	/**
+	 * Return a copy the default value or null if the default was null.
+	 * 
+	 */
+	private V copyDefault() {
+		return defaultValue != null ? valueSerializer.copy(defaultValue) : null;
+	}
+
+	/**
+	 * Create a table for the kvstate checkpoints (based on the kvStateId) and
+	 * prepare the statements used during checkpointing.
+	 */
+	private void initDB(final Connection con) throws IOException {
+
+		retry(new Callable<Void>() {
+			public Void call() throws Exception {
+
+				dbAdapter.createKVStateTable(kvStateId, con);
+
+				insertStatement = dbAdapter.prepareKVCheckpointInsert(kvStateId, con);
+				selectStatement = dbAdapter.prepareKeyLookup(kvStateId, con);
+
+				return null;
+			}
+
+		}, numSqlRetries, sqlRetrySleep);
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		// If compaction is turned on we compact on the first subtask
+		if (compactEvery > 0 && compact && checkpointId % compactEvery == 0) {
+			try {
+				dbAdapter.compactKvStates(kvStateId, con, 0, checkpointId);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("State succesfully compacted for {}.", kvStateId);
+				}
+			} catch (SQLException e) {
+				LOG.warn("State compaction failed due: {}", e);
+			}
+		}
+	}
+
+	@Override
+	public void dispose() {
+		// We are only closing the statements here, the connection is borrowed
+		// from the state backend and will be closed there.
+		try {
+			selectStatement.close();
+		} catch (SQLException e) {
+			// There is not much to do about this
+		}
+		try {
+			insertStatement.close();
+		} catch (SQLException e) {
+			// There is not much to do about this
+		}
+	}
+
+	/**
+	 * Return the Map of cached states.
+	 * 
+	 */
+	public Map<K, Optional<V>> getStateCache() {
+		return cache;
+	}
+
+	/**
+	 * Return the Map of modified states that hasn't been written to the
+	 * database yet.
+	 * 
+	 */
+	public Map<K, Optional<V>> getModified() {
+		return cache.modified;
+	}
+
+	public boolean isCompacter() {
+		return compact;
+	}
+
+	/**
+	 * Snapshot that stores a specific checkpoint id and state id, and also
+	 * rolls back the database to that point upon restore. The rollback is done
+	 * by removing all state checkpoints that have ids between the checkpoint
+	 * and recovery id.
+	 *
+	 */
+	private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final String kvStateId;
+		private final long checkpointId;
+
+		public DbKvStateSnapshot(String kvStateId, long checkpointId) {
+			this.checkpointId = checkpointId;
+			this.kvStateId = kvStateId;
+		}
+
+		@Override
+		public LazyDbKvState<K, V> restoreState(final DbStateBackend stateBackend,
+				final TypeSerializer<K> keySerializer, final TypeSerializer<V> valueSerializer, final V defaultValue,
+				ClassLoader classLoader, final long nextId) throws IOException {
+
+			// First we clean up the states written by partially failed
+			// snapshots (we only do it on 1 subtask)
+			retry(new Callable<Void>() {
+				public Void call() throws Exception {
+					stateBackend.getConfiguration().getDbAdapter().cleanupFailedCheckpoints(kvStateId,
+							stateBackend.getConnection(), checkpointId, nextId);
+
+					return null;
+				}
+			}, stateBackend.getConfiguration().getMaxNumberOfSqlRetries(),
+					stateBackend.getConfiguration().getSleepBetweenSqlRetries());
+
+			boolean cleanup = stateBackend.getEnvironment().getIndexInSubtaskGroup() == stateBackend.getShardIndex();
+
+			// Restore the KvState
+			LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(kvStateId, cleanup,
+					stateBackend.getConnection(), stateBackend.getConfiguration(), keySerializer, valueSerializer,
+					defaultValue, nextId);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("KV state({},{}) restored.", kvStateId, nextId);
+			}
+
+			return restored;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+			// Don't discard, it will be compacted by the LazyDbKvState
+		}
+
+	}
+
+	/**
+	 * LRU cache implementation for storing the key-value states. When the cache
+	 * is full elements are not evicted one by one but are evicted in a batch
+	 * defined by the evictionSize parameter.
+	 * <p>
+	 * Keys not found in the cached will be retrieved from the underlying
+	 * database
+	 */
+	private final class StateCache extends LinkedHashMap<K, Optional<V>> {
+		private static final long serialVersionUID = 1L;
+
+		private final int cacheSize;
+		private final int evictionSize;
+
+		// We keep track the state modified since the last checkpoint
+		private final Map<K, Optional<V>> modified = new HashMap<>();
+
+		public StateCache(int cacheSize, int evictionSize) {
+			super(cacheSize, 0.75f, true);
+			this.cacheSize = cacheSize;
+			this.evictionSize = evictionSize;
+		}
+
+		@Override
+		public Optional<V> put(K key, Optional<V> value) {
+			// Put kv pair in the cache and evict elements if the cache is full
+			Optional<V> old = super.put(key, value);
+			modified.put(key, value);
+			evictIfFull();
+			return old;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Optional<V> get(Object key) {
+			// First we check whether the value is cached
+			Optional<V> value = super.get(key);
+			if (value == null) {
+				// If it doesn't try to load it from the database
+				value = Optional.fromNullable(getFromDatabaseOrNull((K) key));
+				put((K) key, value);
+			}
+			// We currently mark elements that were retreived also as modified
+			// in case the user applies some mutation without update.
+			modified.put((K) key, value);
+			return value;
+		}
+
+		@Override
+		protected boolean removeEldestEntry(Entry<K, Optional<V>> eldest) {
+			// We need to remove elements manually if the cache becomes full, so
+			// we always return false here.
+			return false;
+		}
+
+		/**
+		 * Fetch the current value from the database if exists or return null.
+		 * 
+		 * @param key
+		 * @return The value corresponding to the key and the last checkpointid
+		 *         from the database if exists or null.
+		 */
+		private V getFromDatabaseOrNull(final K key) {
+			try {
+				return retry(new Callable<V>() {
+					public V call() throws Exception {
+						// We lookup using the adapter and serialize/deserialize
+						// with the TypeSerializers
+						byte[] serializedVal = dbAdapter.lookupKey(kvStateId, selectStatement,
+								InstantiationUtil.serializeToByteArray(keySerializer, key), nextCheckpointId);
+
+						return serializedVal != null
+								? InstantiationUtil.deserializeFromByteArray(valueSerializer, serializedVal) : null;
+					}
+				}, numSqlRetries, sqlRetrySleep);
+			} catch (IOException e) {
+				// We need to re-throw this exception to conform to the map
+				// interface, we will catch this when we call the the put/get
+				throw new RuntimeException(e);
+			}
+		}
+
+		/**
+		 * If the cache is full we remove the evictionSize least recently
+		 * accessed elements and write them to the database if they were
+		 * modified since the last checkpoint.
+		 */
+		private void evictIfFull() {
+			if (size() > cacheSize) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("State cache is full for {}, evicting {} elements.", kvStateId, evictionSize);
+				}
+				try {
+					int numEvicted = 0;
+
+					Iterator<Entry<K, Optional<V>>> entryIterator = entrySet().iterator();
+					while (numEvicted++ < evictionSize && entryIterator.hasNext()) {
+
+						Entry<K, Optional<V>> next = entryIterator.next();
+
+						// We only need to write to the database if modified
+						if (modified.remove(next.getKey()) != null) {
+							batchInsert.add(next, nextCheckpointId);
+						}
+
+						entryIterator.remove();
+					}
+
+					batchInsert.flush(nextCheckpointId);
+
+				} catch (IOException e) {
+					// We need to re-throw this exception to conform to the map
+					// interface, we will catch this when we call the the
+					// put/get
+					throw new RuntimeException(e);
+				}
+			}
+		}
+
+		@Override
+		public void putAll(Map<? extends K, ? extends Optional<V>> m) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void clear() {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	// Object for handling inserts to the database by batching them together
+	private class BatchInsert {
+		List<Tuple2<byte[], byte[]>> toInsert = new ArrayList<>();
+
+		public void add(Entry<K, Optional<V>> next, long checkpointId) throws IOException {
+			K k = next.getKey();
+			V v = next.getValue().orNull();
+			toInsert.add(Tuple2.of(InstantiationUtil.serializeToByteArray(keySerializer, k),
+					v != null ? InstantiationUtil.serializeToByteArray(valueSerializer, v) : null));
+			if (toInsert.size() == maxInsertBatchSize) {
+				flush(checkpointId);
+			}
+		}
+
+		public void flush(long checkpointId) throws IOException {
+			if (!toInsert.isEmpty()) {
+				dbAdapter.insertBatch(kvStateId, conf, con, insertStatement, checkpointId, toInsert);
+				toInsert.clear();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
new file mode 100644
index 0000000..1cbe696
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * 
+ * Adapter for bridging inconsistencies between the different SQL
+ * implementations. The default implementation has been tested to work well with
+ * MySQL
+ *
+ */
+public class MySqlAdapter implements Serializable, DbAdapter {
+
+	private static final long serialVersionUID = 1L;
+
+	// -----------------------------------------------------------------------------
+	// Non-partitioned state checkpointing
+	// -----------------------------------------------------------------------------
+
+	@Override
+	public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate(
+					"CREATE TABLE IF NOT EXISTS checkpoints_" + jobId
+							+ " ("
+							+ "checkpointId bigint, "
+							+ "timestamp bigint, "
+							+ "handleId bigint,"
+							+ "checkpoint blob,"
+							+ "PRIMARY KEY (handleId)"
+							+ ")");
+		}
+
+	}
+
+	@Override
+	public PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException {
+		return con.prepareStatement(
+				"INSERT INTO checkpoints_" + jobId
+						+ " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
+	}
+
+	@Override
+	public void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
+			long timestamp, long handleId, byte[] checkpoint) throws SQLException {
+		insertStatement.setLong(1, checkpointId);
+		insertStatement.setLong(2, timestamp);
+		insertStatement.setLong(3, handleId);
+		insertStatement.setBytes(4, checkpoint);
+	}
+
+	@Override
+	public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
+			throws SQLException {
+		try (Statement smt = con.createStatement()) {
+			ResultSet rs = smt.executeQuery(
+					"SELECT checkpoint FROM checkpoints_" + jobId
+							+ " WHERE handleId = " + handleId);
+			if (rs.next()) {
+				return rs.getBytes(1);
+			} else {
+				throw new SQLException("Checkpoint cannot be found in the database.");
+			}
+		}
+	}
+
+	@Override
+	public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
+			throws SQLException {
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate(
+					"DELETE FROM checkpoints_" + jobId
+							+ " WHERE handleId = " + handleId);
+		}
+	}
+
+	@Override
+	public void disposeAllStateForJob(String jobId, Connection con) throws SQLException {
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate(
+					"DROP TABLE checkpoints_" + jobId);
+		}
+	}
+
+	// -----------------------------------------------------------------------------
+	// Partitioned state checkpointing
+	// -----------------------------------------------------------------------------
+
+	@Override
+	public void createKVStateTable(String stateId, Connection con) throws SQLException {
+		validateStateId(stateId);
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate(
+					"CREATE TABLE IF NOT EXISTS kvstate_" + stateId
+							+ " ("
+							+ "id bigint, "
+							+ "k varbinary(256), "
+							+ "v blob, "
+							+ "PRIMARY KEY (k, id) "
+							+ ")");
+		}
+	}
+
+	@Override
+	public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
+		validateStateId(stateId);
+		return con.prepareStatement(
+				"INSERT INTO kvstate_" + stateId + " (id, k, v) VALUES (?,?,?) "
+						+ "ON DUPLICATE KEY UPDATE v=? ");
+	}
+
+	@Override
+	public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException {
+		validateStateId(stateId);
+		return con.prepareStatement("SELECT v"
+				+ " FROM kvstate_" + stateId
+				+ " WHERE k = ?"
+				+ " AND id <= ?"
+				+ " ORDER BY id DESC LIMIT 1");
+	}
+
+	@Override
+	public byte[] lookupKey(String stateId, PreparedStatement lookupStatement, byte[] key, long lookupId)
+			throws SQLException {
+		lookupStatement.setBytes(1, key);
+		lookupStatement.setLong(2, lookupId);
+
+		ResultSet res = lookupStatement.executeQuery();
+
+		if (res.next()) {
+			return res.getBytes(1);
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public void cleanupFailedCheckpoints(String stateId, Connection con, long checkpointId,
+			long nextId) throws SQLException {
+		validateStateId(stateId);
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate("DELETE FROM kvstate_" + stateId
+					+ " WHERE id > " + checkpointId
+					+ " AND id < " + nextId);
+			System.out.println("Cleaned up");
+		}
+	}
+
+	protected void compactKvStates(String stateId, Connection con, long lowerId, long upperId)
+			throws SQLException {
+		validateStateId(stateId);
+
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate("DELETE state.* FROM kvstate_" + stateId + " AS state"
+					+ " JOIN"
+					+ " ("
+					+ " 	SELECT MAX(id) AS maxts, k FROM kvstate_" + stateId
+					+ " 	WHERE id BETWEEN " + lowerId + " AND " + upperId
+					+ " 	GROUP BY k"
+					+ " ) m"
+					+ " ON state.k = m.k"
+					+ " AND state.id >= " + lowerId);
+			System.out.println("Compacted");
+		}
+	}
+
+	/**
+	 * Tries to avoid SQL injection with weird state names.
+	 * 
+	 */
+	protected static void validateStateId(String name) {
+		if (!name.matches("[a-zA-Z0-9_]+")) {
+			throw new RuntimeException("State name contains invalid characters.");
+		}
+	}
+
+	@Override
+	public void insertBatch(final String stateId, final DbBackendConfig conf,
+			final Connection con, final PreparedStatement insertStatement, final long checkpointId,
+			final List<Tuple2<byte[], byte[]>> toInsert) throws IOException {
+
+		SQLRetrier.retry(new Callable<Void>() {
+			public Void call() throws Exception {
+				for (Tuple2<byte[], byte[]> kv : toInsert) {
+					setKvInsertParams(stateId, insertStatement, checkpointId, kv.f0, kv.f1);
+					insertStatement.addBatch();
+				}
+				insertStatement.executeBatch();
+				insertStatement.clearBatch();
+				System.out.println("Batch inserted");
+				return null;
+			}
+		}, new Callable<Void>() {
+			public Void call() throws Exception {
+				insertStatement.clearBatch();
+				return null;
+			}
+		}, conf.getMaxNumberOfSqlRetries(), conf.getSleepBetweenSqlRetries());
+	}
+
+	private void setKvInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
+			byte[] key, byte[] value) throws SQLException {
+		insertStatement.setLong(1, checkpointId);
+		insertStatement.setBytes(2, key);
+		if (value != null) {
+			insertStatement.setBytes(3, value);
+			insertStatement.setBytes(4, value);
+		} else {
+			insertStatement.setNull(3, Types.BLOB);
+			insertStatement.setNull(4, Types.BLOB);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/SQLRetrier.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/SQLRetrier.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/SQLRetrier.java
new file mode 100644
index 0000000..4ae3fd2
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/SQLRetrier.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple utility to retry failed SQL commands for a predefined number of times
+ * before declaring failure. The retrier waits (randomly) between 2 retries.
+ *
+ */
+public final class SQLRetrier implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SQLRetrier.class);
+	private static final Random rnd = new Random();
+
+	private static final int SLEEP_TIME = 10;
+
+	private SQLRetrier() {
+
+	}
+
+	/**
+	 * Tries to run the given {@link Callable} the predefined number of times
+	 * before throwing an {@link IOException}. This method will only retries
+	 * calls ending in {@link SQLException}. Other exceptions will cause a
+	 * {@link RuntimeException}.
+	 * 
+	 * @param callable
+	 *            The callable to be retried.
+	 * @param numRetries
+	 *            Max number of retries before throwing an {@link IOException}.
+	 * @throws IOException
+	 *             The wrapped {@link SQLException}.
+	 */
+	public static <X> X retry(Callable<X> callable, int numRetries) throws IOException {
+		return retry(callable, numRetries, SLEEP_TIME);
+	}
+
+	/**
+	 * Tries to run the given {@link Callable} the predefined number of times
+	 * before throwing an {@link IOException}. This method will only retries
+	 * calls ending in {@link SQLException}. Other exceptions will cause a
+	 * {@link RuntimeException}.
+	 * 
+	 * @param callable
+	 *            The callable to be retried.
+	 * @param numRetries
+	 *            Max number of retries before throwing an {@link IOException}.
+	 * @param sleep
+	 *            The retrier will wait a random number of msecs between 1 and
+	 *            sleep.
+	 * @return The result of the {@link Callable#call()}.
+	 * @throws IOException
+	 *             The wrapped {@link SQLException}.
+	 */
+	public static <X> X retry(Callable<X> callable, int numRetries, int sleep) throws IOException {
+		int numtries = 0;
+		while (true) {
+			try {
+				return callable.call();
+			} catch (SQLException e) {
+				handleSQLException(e, ++numtries, numRetries, sleep);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+	/**
+	 * Tries to run the given {@link Callable} the predefined number of times
+	 * before throwing an {@link IOException}. This method will only retries
+	 * calls ending in {@link SQLException}. Other exceptions will cause a
+	 * {@link RuntimeException}.
+	 * 
+	 * Additionally the user can supply a second callable which will be executed
+	 * every time the first callable throws a {@link SQLException}.
+	 * 
+	 * @param callable
+	 *            The callable to be retried.
+	 * @param onException
+	 *            The callable to be executed when an {@link SQLException} was
+	 *            encountered. Exceptions thrown during this call are ignored.
+	 * @param numRetries
+	 *            Max number of retries before throwing an {@link IOException}.
+	 * @param sleep
+	 *            The retrier will wait a random number of msecs between 1 and
+	 *            sleep.
+	 * @return The result of the {@link Callable#call()}.
+	 * @throws IOException
+	 *             The wrapped {@link SQLException}.
+	 */
+	public static <X, Y> X retry(Callable<X> callable, Callable<Y> onException, int numRetries, int sleep)
+			throws IOException {
+		int numtries = 0;
+		while (true) {
+			try {
+				return callable.call();
+			} catch (SQLException se) {
+				try {
+					onException.call();
+				} catch (Exception e) {
+					// Exceptions thrown in this call will be ignored
+				}
+				handleSQLException(se, ++numtries, numRetries, sleep);
+			} catch (Exception ex) {
+				throw new RuntimeException(ex);
+			}
+		}
+	}
+
+	/**
+	 * Tries to run the given {@link Callable} the predefined number of times
+	 * before throwing an {@link IOException}. This method will only retries
+	 * calls ending in {@link SQLException}. Other exceptions will cause a
+	 * {@link RuntimeException}.
+	 * 
+	 * Additionally the user can supply a second callable which will be executed
+	 * every time the first callable throws a {@link SQLException}.
+	 * 
+	 * @param callable
+	 *            The callable to be retried.
+	 * @param onException
+	 *            The callable to be executed when an {@link SQLException} was
+	 *            encountered. Exceptions thrown during this call are ignored.
+	 * @param numRetries
+	 *            Max number of retries before throwing an {@link IOException}.
+	 * @return The result of the {@link Callable#call()}.
+	 * @throws IOException
+	 *             The wrapped {@link SQLException}.
+	 */
+	public static <X, Y> X retry(Callable<X> callable, Callable<Y> onException, int numRetries)
+			throws IOException {
+		return retry(callable, onException, numRetries, SLEEP_TIME);
+	}
+
+	private static void handleSQLException(SQLException e, int numTries, int maxRetries, int sleep) throws IOException {
+		if (numTries < maxRetries) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Error while executing SQL statement: {}\nRetrying...",
+						e.getMessage());
+			}
+			try {
+				Thread.sleep(numTries * rnd.nextInt(sleep));
+			} catch (InterruptedException ie) {
+				throw new RuntimeException("Thread has been interrupted.");
+			}
+		} else {
+			throw new IOException(
+					"Could not execute SQL statement after " + maxRetries + " retries.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
new file mode 100644
index 0000000..4cd2333
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.derby.drda.NetworkServerControl;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.IdentityKeySelector;
+import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.NonSerializableLong;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 1_000_000L;
+	final static int NUM_KEYS = 100;
+	private static NetworkServerControl server;
+	private static File tempDir;
+
+	@Before
+	public void startDerbyServer() throws UnknownHostException, Exception {
+		server = new NetworkServerControl(InetAddress.getByName("localhost"), 1526, "flink", "flink");
+		server.start(null);
+		tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+	}
+
+	@After
+	public void stopDerbyServer() {
+		try {
+			server.shutdown();
+			FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB1"));
+			FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB2"));
+			FileUtils.forceDelete(new File("derby.log"));
+		} catch (Exception ignore) {
+		}
+	}
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		env.enableCheckpointing(500);
+
+		// We create 2 shards
+		List<String> derbyShards = Lists.newArrayList(
+				"jdbc:derby://localhost:1526/" + tempDir.getAbsolutePath() + "/flinkDB1;create=true",
+				"jdbc:derby://localhost:1526/" + tempDir.getAbsolutePath() + "/flinkDB2;create=true");
+
+		DbBackendConfig conf = new DbBackendConfig("flink", "flink", derbyShards);
+		conf.setDbAdapterClass(DerbyAdapter.class);
+		conf.setKvStateCompactionFrequency(2);
+
+		// We store the non-partitioned states (source offset) in-memory
+		DbStateBackend backend = new DbStateBackend(conf, new MemoryStateBackend());
+
+		env.setStateBackend(backend);
+
+		DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
+		DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2));
+
+		stream1.union(stream2).keyBy(new IdentityKeySelector<Integer>()).map(new OnceFailingPartitionedSum(NUM_STRINGS))
+				.keyBy(0).addSink(new CounterSink());
+	}
+
+	@Override
+	public void postSubmit() {
+		// verify that we counted exactly right
+		for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
+			assertEquals(new Long(sum.getKey() * NUM_STRINGS / NUM_KEYS), sum.getValue());
+		}
+		for (Long count : CounterSink.allCounts.values()) {
+			assertEquals(new Long(NUM_STRINGS / NUM_KEYS), count);
+		}
+
+		assertEquals(NUM_KEYS, CounterSink.allCounts.size());
+		assertEquals(NUM_KEYS, OnceFailingPartitionedSum.allSums.size());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
+			implements Checkpointed<Integer> {
+
+		private final long numElements;
+
+		private int index;
+		private int step;
+
+		private Random rnd = new Random();
+
+		private volatile boolean isRunning = true;
+
+		static final long[] counts = new long[PARALLELISM];
+
+		@Override
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
+		}
+
+		IntGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			step = getRuntimeContext().getNumberOfParallelSubtasks();
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			while (isRunning && index < numElements) {
+
+				synchronized (lockingObject) {
+					index += step;
+					ctx.collect(index % NUM_KEYS);
+				}
+
+				if (rnd.nextDouble() < 0.008) {
+					Thread.sleep(1);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+
+	private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> {
+
+		private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>();
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+		private OperatorState<Long> sum;
+
+		OnceFailingPartitionedSum(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.6 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.8 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+			sum = getRuntimeContext().getKeyValueState("my_state", Long.class, 0L);
+		}
+
+		@Override
+		public Tuple2<Integer, Long> map(Integer value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			long currentSum = sum.value() + value;
+			sum.update(currentSum);
+			allSums.put(value, currentSum);
+			return new Tuple2<Integer, Long>(value, currentSum);
+		}
+	}
+
+	private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {
+
+		private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
+
+		private OperatorState<NonSerializableLong> aCounts;
+		private OperatorState<Long> bCounts;
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			aCounts = getRuntimeContext().getKeyValueState("a", NonSerializableLong.class, NonSerializableLong.of(0L));
+			bCounts = getRuntimeContext().getKeyValueState("b", Long.class, 0L);
+		}
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			long ac = aCounts.value().value;
+			long bc = bCounts.value();
+			assertEquals(ac, bc);
+
+			long currentCount = ac + 1;
+			aCounts.update(NonSerializableLong.of(currentCount));
+			bCounts.update(currentCount);
+
+			allCounts.put(value.f0, currentCount);
+		}
+	}
+}


Mime
View raw message