flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/8] flink git commit: [FLINK-2924] [streaming] Execute compactions in background thread + keep connections alive on empty snapshots
Date Tue, 24 Nov 2015 09:45:03 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8cabe67e7 -> db2a964a4


[FLINK-2924] [streaming] Execute compactions in background thread + keep connections alive
on empty snapshots

Closes #1305


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db2a964a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db2a964a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db2a964a

Branch: refs/heads/master
Commit: db2a964a450c05cb2aad3843999d994e4b8e5ef5
Parents: cd8be0b
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat Nov 21 18:29:55 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Tue Nov 24 09:28:39 2015 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/DbAdapter.java      |   7 ++
 .../streaming/state/DbBackendConfig.java        |  76 ------------
 .../contrib/streaming/state/DbStateBackend.java |   4 +-
 .../contrib/streaming/state/DbStateHandle.java  |   8 +-
 .../contrib/streaming/state/LazyDbKvState.java  | 118 ++++++++++++++-----
 .../contrib/streaming/state/MySqlAdapter.java   |   7 ++
 .../streaming/state/DbStateBackendTest.java     |  95 ++++++++++-----
 .../flink/runtime/state/StateBackend.java       |   4 +-
 .../state/filesystem/FsStateBackend.java        |   2 +-
 .../state/memory/MemoryStateBackend.java        |   2 +-
 .../runtime/state/FileStateBackendTest.java     |   6 +-
 .../runtime/state/MemoryStateBackendTest.java   |   6 +-
 .../api/operators/AbstractStreamOperator.java   |   4 +-
 13 files changed, 188 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
index 2162f32..26c27dd 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -178,4 +178,11 @@ public interface DbAdapter extends Serializable {
 	 */
 	void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws
SQLException;
 
+	/**
+	 * Execute a simple operation to refresh the current database connection in
+	 * case no data is written for a longer time period. Usually something like
+	 * "select 1"
+	 */
+	void keepAlive(Connection con) throws SQLException;
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
index 55ecf83..883b65a 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbBackendConfig.java
@@ -327,80 +327,4 @@ public class DbBackendConfig implements Serializable {
 			return new ShardedConnection(shardUrls, userName, userPassword, shardPartitioner);
 		}
 	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj) {
-			return true;
-		}
-		if (obj == null) {
-			return false;
-		}
-		if (!(obj instanceof DbBackendConfig)) {
-			return false;
-		}
-		DbBackendConfig other = (DbBackendConfig) obj;
-		if (JDBCDriver == null) {
-			if (other.JDBCDriver != null) {
-				return false;
-			}
-		} else if (!JDBCDriver.equals(other.JDBCDriver)) {
-			return false;
-		}
-		if (dbAdapter == null) {
-			if (other.dbAdapter != null) {
-				return false;
-			}
-		} else if (!dbAdapter.getClass().equals(other.dbAdapter.getClass())) {
-			return false;
-		}
-		if (kvStateCacheSize != other.kvStateCacheSize) {
-			return false;
-		}
-		if (kvStateCompactionFreq != other.kvStateCompactionFreq) {
-			return false;
-		}
-		if (Float.floatToIntBits(maxKvEvictFraction) != Float.floatToIntBits(other.maxKvEvictFraction))
{
-			return false;
-		}
-		if (maxKvInsertBatchSize != other.maxKvInsertBatchSize) {
-			return false;
-		}
-		if (maxNumberOfSqlRetries != other.maxNumberOfSqlRetries) {
-			return false;
-		}
-		if (shardPartitioner == null) {
-			if (other.shardPartitioner != null) {
-				return false;
-			}
-		} else if (!shardPartitioner.getClass().equals(other.shardPartitioner.getClass())) {
-			return false;
-		}
-		if (shardUrls == null) {
-			if (other.shardUrls != null) {
-				return false;
-			}
-		} else if (!shardUrls.equals(other.shardUrls)) {
-			return false;
-		}
-		if (sleepBetweenSqlRetries != other.sleepBetweenSqlRetries) {
-			return false;
-		}
-		if (userName == null) {
-			if (other.userName != null) {
-				return false;
-			}
-		} else if (!userName.equals(other.userName)) {
-			return false;
-		}
-		if (userPassword == null) {
-			if (other.userPassword != null) {
-				return false;
-			}
-		} else if (!userPassword.equals(other.userPassword)) {
-			return false;
-		}
-		return true;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index dce0df8..72482ae 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -177,10 +177,10 @@ public class DbStateBackend extends StateBackend<DbStateBackend>
{
 	}
 
 	@Override
-	public <K, V> LazyDbKvState<K, V> createKvState(int operatorId, String stateName,
+	public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue)
throws IOException {
 		return new LazyDbKvState<K, V>(
-				stateName + "_" + operatorId + "_" + env.getJobID().toShortString(),
+				stateId + "_" + env.getJobID().toShortString(),
 				env.getIndexInSubtaskGroup() == 0,
 				getConnections(),
 				getConfiguration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
index fa300a4..2ecfcc4 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -25,7 +25,8 @@ import java.util.concurrent.Callable;
 
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.InstantiationUtil;
-import org.eclipse.jetty.util.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * State handle implementation for storing checkpoints as byte arrays in
@@ -35,6 +36,7 @@ import org.eclipse.jetty.util.log.Log;
 public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 
 	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class);
 
 	private final String jobId;
 	private final DbBackendConfig dbConfig;
@@ -75,8 +77,8 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S>
{
 			}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
 		} catch (IOException e) {
 			// We don't want to fail the job here, but log the error.
-			if (Log.isDebugEnabled()) {
-				Log.debug("Could not discard state.");
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Could not discard state.");
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
index 12a3332..3d7abff 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -75,6 +77,8 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 	private final int maxInsertBatchSize;
 	// We will do database compaction every so many checkpoints
 	private final int compactEvery;
+	// Executor for automatic compactions
+	private ExecutorService executor = null;
 
 	// Database properties
 	private final DbBackendConfig conf;
@@ -96,7 +100,7 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 	private long nextTs;
 	private Map<Long, Long> completedCheckpoints = new HashMap<>();
 
-	private long lastCompactedTs;
+	private volatile long lastCompactedTs;
 
 	// ------------------------------------------------------
 
@@ -119,6 +123,10 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 
 		this.kvStateId = kvStateId;
 		this.compact = compact;
+		if (compact) {
+			// Compactions will run in a seperate thread
+			executor = Executors.newSingleThreadExecutor();
+		}
 
 		this.keySerializer = keySerializer;
 		this.valueSerializer = valueSerializer;
@@ -186,13 +194,28 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 					+ "this should not happen.");
 		}
 
-		// We insert the modified elements to the database with the current
-		// timestamp then clear the modified states
-		for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
-			batchInsert.add(state, timestamp);
+		// If there are any modified states we perform the inserts
+		if (!cache.modified.isEmpty()) {
+			// We insert the modified elements to the database with the current
+			// timestamp then clear the modified states
+			for (Entry<K, Optional<V>> state : cache.modified.entrySet()) {
+				batchInsert.add(state, timestamp);
+			}
+			batchInsert.flush(timestamp);
+			cache.modified.clear();
+		} else if (compact) {
+			// Otherwise we call the keep alive method to avoid dropped
+			// connections (only call this on the compactor instance)
+			for (final Connection c : connections.connections()) {
+				SQLRetrier.retry(new Callable<Void>() {
+					@Override
+					public Void call() throws Exception {
+						dbAdapter.keepAlive(c);
+						return null;
+					}
+				}, numSqlRetries, sqlRetrySleep);
+			}
 		}
-		batchInsert.flush(timestamp);
-		cache.modified.clear();
 
 		nextTs = timestamp + 1;
 		completedCheckpoints.put(checkpointId, timestamp);
@@ -240,23 +263,14 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) {
-		Long ts = completedCheckpoints.remove(checkpointId);
+		final Long ts = completedCheckpoints.remove(checkpointId);
 		if (ts == null) {
 			LOG.warn("Complete notification for missing checkpoint: " + checkpointId);
-			ts = 0L;
-		}
-		// If compaction is turned on we compact on the first subtask
-		if (compactEvery > 0 && compact && checkpointId % compactEvery == 0)
{
-			try {
-				for (Connection c : connections.connections()) {
-					dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, ts);
-				}
-				lastCompactedTs = ts;
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("State succesfully compacted for {}.", kvStateId);
-				}
-			} catch (SQLException e) {
-				LOG.warn("State compaction failed due: {}", e);
+		} else {
+			// If compaction is turned on we compact on the compactor subtask
+			// asynchronously in the background
+			if (compactEvery > 0 && compact && checkpointId % compactEvery == 0)
{
+				executor.execute(new Compactor(ts));
 			}
 		}
 	}
@@ -275,6 +289,10 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 		} catch (SQLException e) {
 			// There is not much to do about this
 		}
+
+		if (executor != null) {
+			executor.shutdown();
+		}
 	}
 
 	/**
@@ -294,15 +312,25 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 		return cache.modified;
 	}
 
-	public boolean isCompacter() {
+	/**
+	 * Used for testing purposes
+	 */
+	public boolean isCompactor() {
 		return compact;
 	}
 
 	/**
-	 * Snapshot that stores a specific checkpoint timestamp and state id, and also
-	 * rolls back the database to that point upon restore. The rollback is done
-	 * by removing all state checkpoints that have timestamps between the checkpoint
-	 * and recovery timestamp.
+	 * Used for testing purposes
+	 */
+	public ExecutorService getExecutor() {
+		return executor;
+	}
+
+	/**
+	 * Snapshot that stores a specific checkpoint timestamp and state id, and
+	 * also rolls back the database to that point upon restore. The rollback is
+	 * done by removing all state checkpoints that have timestamps between the
+	 * checkpoint and recovery timestamp.
 	 *
 	 */
 	private static class DbKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, DbStateBackend>
{
@@ -557,4 +585,40 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 
 		}
 	}
+
+	private class Compactor implements Runnable {
+
+		private long upperBound;
+
+		public Compactor(long upperBound) {
+			this.upperBound = upperBound;
+		}
+
+		@Override
+		public void run() {
+			// We create new database connections to make sure we don't
+			// interfere with the checkpointing (connections are not thread
+			// safe)
+			try (ShardedConnection sc = conf.createShardedConnection()) {
+				for (final Connection c : sc.connections()) {
+					SQLRetrier.retry(new Callable<Void>() {
+						@Override
+						public Void call() throws Exception {
+							dbAdapter.compactKvStates(kvStateId, c, lastCompactedTs, upperBound);
+							return null;
+						}
+					}, numSqlRetries, sqlRetrySleep);
+				}
+				if (LOG.isInfoEnabled()) {
+					LOG.info("State succesfully compacted for {} between {} and {}.", kvStateId,
+							lastCompactedTs,
+							upperBound);
+				}
+				lastCompactedTs = upperBound;
+			} catch (SQLException | IOException e) {
+				LOG.warn("State compaction failed due: {}", e);
+			}
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
index 7d3eca0..9eaa283 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -234,5 +234,12 @@ public class MySqlAdapter implements DbAdapter {
 			insertStatement.setNull(4, Types.BLOB);
 		}
 	}
+	
+	@Override
+	public void keepAlive(Connection con) throws SQLException {
+		try(Statement smt = con.createStatement()) {
+			smt.executeQuery("SELECT 1");
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 5f8610e..209086f 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -30,13 +30,14 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.derby.drda.NetworkServerControl;
@@ -51,7 +52,6 @@ import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.shaded.com.google.common.collect.Lists;
-import org.apache.flink.util.InstantiationUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -100,16 +100,17 @@ public class DbStateBackendTest {
 		// serialize / copy the backend
 		DbStateBackend backend = CommonTestUtils.createCopySerializable(dbBackend);
 		assertFalse(backend.isInitialized());
-		assertEquals(dbBackend.getConfiguration(), backend.getConfiguration());
 
 		Environment env = new DummyEnvironment("test", 1, 0);
 		backend.initializeForJob(env);
 
 		assertNotNull(backend.getConnections());
-		assertTrue(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
+		assertTrue(
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
 
 		backend.disposeAllStateForCurrentJob();
-		assertFalse(isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
+		assertFalse(
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
 		backend.close();
 
 		assertTrue(backend.getConnections().getFirst().isClosed());
@@ -165,7 +166,7 @@ public class DbStateBackendTest {
 
 			backend.initializeForJob(env);
 
-			LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
+			LazyDbKvState<Integer, String> kv = backend.createKvState("state1_1", "state1",
IntSerializer.INSTANCE,
 					StringSerializer.INSTANCE, null);
 
 			String tableName = "state1_1_" + env.getJobID().toShortString();
@@ -196,19 +197,9 @@ public class DbStateBackendTest {
 			kv.setCurrentKey(3);
 			kv.update("u3");
 
-			assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 100));
-
-			kv.notifyCheckpointComplete(682375462378L);
-
 			// draw another snapshot
 			KvStateSnapshot<Integer, String, DbStateBackend> snapshot2 = kv.snapshot(682375462379L,
 					200);
-			assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 100));
-			assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 200));
-			kv.notifyCheckpointComplete(682375462379L);
-			// Compaction should be performed
-			assertFalse(containsKey(backend.getConnections().getFirst(), tableName, 1, 100));
-			assertTrue(containsKey(backend.getConnections().getFirst(), tableName, 1, 200));
 
 			// validate the original state
 			assertEquals(3, kv.size());
@@ -238,9 +229,11 @@ public class DbStateBackendTest {
 	}
 
 	@Test
-	public void testCleanupTasks() throws Exception {
+	public void testCompaction() throws Exception {
 		DbBackendConfig conf = new DbBackendConfig("flink", "flink", url1);
-		conf.setDbAdapter(new DerbyAdapter());
+		MockAdapter adapter = new MockAdapter();
+		conf.setKvStateCompactionFrequency(2);
+		conf.setDbAdapter(adapter);
 
 		DbStateBackend backend1 = new DbStateBackend(conf);
 		DbStateBackend backend2 = new DbStateBackend(conf);
@@ -250,9 +243,40 @@ public class DbStateBackendTest {
 		backend2.initializeForJob(new DummyEnvironment("test", 3, 1));
 		backend3.initializeForJob(new DummyEnvironment("test", 3, 2));
 
-		assertTrue(backend1.createKvState(1, "a", null, null, null).isCompacter());
-		assertFalse(backend2.createKvState(1, "a", null, null, null).isCompacter());
-		assertFalse(backend3.createKvState(1, "a", null, null, null).isCompacter());
+		LazyDbKvState<?, ?> s1 = backend1.createKvState("a_1", "a", null, null, null);
+		LazyDbKvState<?, ?> s2 = backend2.createKvState("a_1", "a", null, null, null);
+		LazyDbKvState<?, ?> s3 = backend3.createKvState("a_1", "a", null, null, null);
+
+		assertTrue(s1.isCompactor());
+		assertFalse(s2.isCompactor());
+		assertFalse(s3.isCompactor());
+		assertNotNull(s1.getExecutor());
+		assertNull(s2.getExecutor());
+		assertNull(s3.getExecutor());
+
+		s1.snapshot(1, 100);
+		s1.notifyCheckpointComplete(1);
+		s1.snapshot(2, 200);
+		s1.snapshot(3, 300);
+		s1.notifyCheckpointComplete(2);
+		s1.notifyCheckpointComplete(3);
+		s1.snapshot(4, 400);
+		s1.snapshot(5, 500);
+		s1.notifyCheckpointComplete(4);
+		s1.notifyCheckpointComplete(5);
+
+		s1.dispose();
+		s2.dispose();
+		s3.dispose();
+
+		// Wait until the compaction completes
+		s1.getExecutor().awaitTermination(5, TimeUnit.SECONDS);
+		assertEquals(2, adapter.numCompcations.get());
+		assertEquals(5, adapter.keptAlive);
+
+		backend1.close();
+		backend2.close();
+		backend3.close();
 	}
 
 	@Test
@@ -279,7 +303,7 @@ public class DbStateBackendTest {
 
 		backend.initializeForJob(env);
 
-		LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
+		LazyDbKvState<Integer, String> kv = backend.createKvState("state1_1", "state1", IntSerializer.INSTANCE,
 				StringSerializer.INSTANCE, "a");
 
 		assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
@@ -423,16 +447,6 @@ public class DbStateBackendTest {
 		}
 	}
 
-	private static boolean containsKey(Connection con, String tableName, int key, long ts)
-			throws SQLException, IOException {
-		try (PreparedStatement smt = con
-				.prepareStatement("select * from " + tableName + " where k=? and timestamp=?")) {
-			smt.setBytes(1, InstantiationUtil.serializeToByteArray(IntSerializer.INSTANCE, key));
-			smt.setLong(2, ts);
-			return smt.executeQuery().next();
-		}
-	}
-
 	private static String localFileUri(File path) {
 		return path.toURI().toString();
 	}
@@ -444,4 +458,21 @@ public class DbStateBackendTest {
 		}
 	}
 
+	private static class MockAdapter extends DerbyAdapter {
+
+		private static final long serialVersionUID = 1L;
+		public AtomicInteger numCompcations = new AtomicInteger(0);
+		public int keptAlive = 0;
+
+		@Override
+		public void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs)
throws SQLException {
+			numCompcations.incrementAndGet();
+		}
+
+		@Override
+		public void keepAlive(Connection con) throws SQLException {
+			keptAlive++;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 6f72bce..293de95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -76,7 +76,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>>
implem
 	/**
 	 * Creates a key/value state backed by this state backend.
 	 *
-	 * @param operatorId Unique id for the operator creating the state
+	 * @param stateId Unique id that identifies the kv state in the streaming program. 
 	 * @param stateName Name of the created state
 	 * @param keySerializer The serializer for the key.
 	 * @param valueSerializer The serializer for the value.
@@ -88,7 +88,7 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>>
implem
 	 *
 	 * @throws Exception Exceptions may occur during initialization of the state and should
be forwarded.
 	 */
-	public abstract <K, V> KvState<K, V, Backend> createKvState(int operatorId,
String stateName,
+	public abstract <K, V> KvState<K, V, Backend> createKvState(String stateId,
String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer,
 			V defaultValue) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 6a94a80..25c63e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -238,7 +238,7 @@ public class FsStateBackend extends StateBackend<FsStateBackend>
{
 	// ------------------------------------------------------------------------
 
 	@Override
-	public <K, V> FsHeapKvState<K, V> createKvState(int operatorId, String stateName,
+	public <K, V> FsHeapKvState<K, V> createKvState(String stateId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue)
throws Exception {
 		return new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, this);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index f3e7552..2963237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -83,7 +83,7 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend>
{
 	// ------------------------------------------------------------------------
 
 	@Override
-	public <K, V> MemHeapKvState<K, V> createKvState(int operatorId, String stateName,
+	public <K, V> MemHeapKvState<K, V> createKvState(String stateId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue)
{
 		return new MemHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 4f10acd..37ccde2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -235,7 +235,7 @@ public class FileStateBackendTest {
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
 
 			KvState<Integer, String, FsStateBackend> kv =
-					backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+					backend.createKvState("0", "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
 
 			assertEquals(0, kv.size());
 
@@ -324,7 +324,7 @@ public class FileStateBackendTest {
 			File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
 
 			KvState<Integer, String, FsStateBackend> kv =
-					backend.createKvState(0, "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+					backend.createKvState("a_0", "a", IntSerializer.INSTANCE, StringSerializer.INSTANCE,
null);
 
 			kv.setCurrentKey(1);
 			kv.update("1");
@@ -394,7 +394,7 @@ public class FileStateBackendTest {
 			backend.initializeForJob(new DummyEnvironment("test", 0, 0));
 
 			KvState<Integer, IntValue, FsStateBackend> kv =
-					backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new
IntValue(-1));
+					backend.createKvState("a_0", "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE,
new IntValue(-1));
 
 			kv.setCurrentKey(1);
 			IntValue default1 = kv.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 8ffe617..4b5aebd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -147,7 +147,7 @@ public class MemoryStateBackendTest {
 			MemoryStateBackend backend = new MemoryStateBackend();
 
 			KvState<Integer, String, MemoryStateBackend> kv =
-					backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+					backend.createKvState("s_0", "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE,
null);
 
 			assertEquals(0, kv.size());
 
@@ -222,7 +222,7 @@ public class MemoryStateBackendTest {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend();
 			KvState<Integer, String, MemoryStateBackend> kv =
-					backend.createKvState(0, "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+					backend.createKvState("s_0", "s", IntSerializer.INSTANCE, StringSerializer.INSTANCE,
null);
 
 			kv.setCurrentKey(1);
 			kv.update("1");
@@ -282,7 +282,7 @@ public class MemoryStateBackendTest {
 		try {
 			MemoryStateBackend backend = new MemoryStateBackend();
 			KvState<Integer, IntValue, MemoryStateBackend> kv =
-					backend.createKvState(0, "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new
IntValue(-1));
+					backend.createKvState("a_0", "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE,
new IntValue(-1));
 
 			kv.setCurrentKey(1);
 			IntValue default1 = kv.value();

http://git-wip-us.apache.org/repos/asf/flink/blob/db2a964a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 025b44a..3f1cfae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -330,8 +330,10 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 		
 		if (kvstate == null) {
+			// create unique state id from operator id + state name
+			String stateId = name + "_" + getOperatorConfig().getVertexID();
 			// create a new blank key/value state
-			kvstate = stateBackend.createKvState(getOperatorConfig().getVertexID() ,name , keySerializer,
valueSerializer, defaultValue);
+			kvstate = stateBackend.createKvState(stateId ,name , keySerializer, valueSerializer, defaultValue);
 		}
 
 		if (keyValueStatesByName == null) {


Mime
View raw message