flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [11/11] flink git commit: [FLINK-2976] [streaming-contrib] Use ApplicationID in DbStateBackend instead of JobID
Date Mon, 11 Jan 2016 15:31:32 GMT
[FLINK-2976] [streaming-contrib] Use ApplicationID in DbStateBackend instead of JobID

[comments] Set larger timeout for future when triggering savepoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77348858
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77348858
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77348858

Branch: refs/heads/master
Commit: 77348858f4632e9b3987cf1ed6a9c8af4c5e1aeb
Parents: 3607575
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Dec 3 11:35:46 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 11 16:31:03 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 79 +++++++++++---------
 .../main/java/org/apache/flink/contrib/.hidden  |  0
 .../contrib/streaming/state/DbAdapter.java      | 31 ++++----
 .../contrib/streaming/state/DbStateBackend.java | 18 +++--
 .../contrib/streaming/state/DbStateHandle.java  | 10 +--
 .../contrib/streaming/state/LazyDbKvState.java  |  2 +-
 .../contrib/streaming/state/MySqlAdapter.java   | 24 +++---
 .../test/java/org/apache/flink/contrib/.hidden  |  0
 .../streaming/state/DbStateBackendTest.java     | 12 +--
 .../contrib/streaming/state/DerbyAdapter.java   |  4 +-
 10 files changed, 97 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 0363d6a..7e1cef7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -18,41 +18,23 @@
 
 package org.apache.flink.client;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 import akka.actor.ActorSystem;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.cli.SavepointOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -69,28 +51,52 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
+import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
+import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
+import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestRunningJobsStatus;
+
 /**
  * Implementation of a simple command line frontend for executing programs.
  */
@@ -486,7 +492,7 @@ public class CliFrontend {
 
 			LOG.info("Connecting to JobManager to retrieve list of jobs");
 			Future<Object> response = jobManagerGateway.ask(
-					JobManagerMessages.getRequestRunningJobsStatus(),
+					getRequestRunningJobsStatus(),
 					askTimeout);
 
 			Object result;
@@ -694,12 +700,15 @@ public class CliFrontend {
 	private int triggerSavepoint(SavepointOptions options, JobID jobId) {
 		try {
 			ActorGateway jobManager = getJobManagerGateway(options);
-			Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId), askTimeout);
+
+			logAndSysout("Triggering savepoint for job " + jobId + ".");
+			Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId),
+					new FiniteDuration(1, TimeUnit.HOURS));
 
 			Object result;
 			try {
-				logAndSysout("Triggering savepoint for job " + jobId + ". Waiting for response...");
-				result = Await.result(response, askTimeout);
+				logAndSysout("Waiting for response...");
+				result = Await.result(response, FiniteDuration.Inf());
 			}
 			catch (Exception e) {
 				throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
@@ -733,11 +742,12 @@ public class CliFrontend {
 	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
 		try {
 			ActorGateway jobManager = getJobManagerGateway(options);
+			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
 			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), askTimeout);
 
 			Object result;
 			try {
-				logAndSysout("Disposing savepoint '" + savepointPath + "'. Waiting for response...");
+				logAndSysout("Waiting for response...");
 				result = Await.result(response, askTimeout);
 			}
 			catch (Exception e) {
@@ -1118,7 +1128,8 @@ public class CliFrontend {
 								return CliFrontend.this.run(params);
 							}
 						});
-					} catch (Exception e) {
+					}
+					catch (Exception e) {
 						return handleError(e);
 					}
 				}
@@ -1130,7 +1141,7 @@ public class CliFrontend {
 			case ACTION_CANCEL:
 				return cancel(params);
 			case ACTION_SAVEPOINT:
-				return savepoint(params)
+				return savepoint(params);
 			case "-h":
 			case "--help":
 				CliFrontendParser.printHelp();

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/.hidden
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
index 26c27dd..491451a 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -29,16 +29,15 @@ import org.apache.flink.api.java.tuple.Tuple2;
 /**
  * Adapter interface for executing different checkpointing related operations on
  * the underlying database.
- *
  */
 public interface DbAdapter extends Serializable {
 
 	/**
 	 * Initialize tables for storing non-partitioned checkpoints for the given
-	 * job id and database connection.
+	 * application id and database connection.
 	 * 
 	 */
-	void createCheckpointsTable(String jobId, Connection con) throws SQLException;
+	void createCheckpointsTable(String appId, Connection con) throws SQLException;
 
 	/**
 	 * Checkpoints will be inserted in the database using prepared statements.
@@ -46,14 +45,14 @@ public interface DbAdapter extends Serializable {
 	 * later to insert using the given connection.
 	 * 
 	 */
-	PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException;
+	PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException;
 
 	/**
 	 * Set the {@link PreparedStatement} parameters for the statement returned
 	 * by {@link #prepareCheckpointInsert(String, Connection)}.
 	 * 
-	 * @param jobId
-	 *            Id of the current job.
+	 * @param appId
+	 *            Id of the current application.
 	 * @param insertStatement
 	 *            Statement returned by
 	 *            {@link #prepareCheckpointInsert(String, Connection)}.
@@ -68,14 +67,14 @@ public interface DbAdapter extends Serializable {
 	 *            The serialized checkpoint.
 	 * @throws SQLException
 	 */
-	void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
+	void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long checkpointId,
 			long timestamp, long handleId, byte[] checkpoint) throws SQLException;
 
 	/**
 	 * Retrieve the serialized checkpoint data from the database.
 	 * 
-	 * @param jobId
-	 *            Id of the current job.
+	 * @param appId
+	 *            Id of the current application.
 	 * @param con
 	 *            Database connection
 	 * @param checkpointId
@@ -88,14 +87,14 @@ public interface DbAdapter extends Serializable {
 	 * @return The byte[] corresponding to the checkpoint or null if missing.
 	 * @throws SQLException
 	 */
-	byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs,
long handleId)
+	byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs,
long handleId)
 			throws SQLException;
 
 	/**
 	 * Remove the given checkpoint from the database.
 	 * 
-	 * @param jobId
-	 *            Id of the current job.
+	 * @param appId
+	 *            Id of the current application.
 	 * @param con
 	 *            Database connection
 	 * @param checkpointId
@@ -108,16 +107,16 @@ public interface DbAdapter extends Serializable {
 	 * @return The byte[] corresponding to the checkpoint or null if missing.
 	 * @throws SQLException
 	 */
-	void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs,
long handleId)
+	void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs,
long handleId)
 			throws SQLException;
 
 	/**
-	 * Remove all states for the given JobId, by for instance dropping the
-	 * entire table.
+	 * Remove all states for the given {@link org.apache.flink.api.common.ApplicationID},
+	 * by for instance dropping the entire table.
 	 * 
 	 * @throws SQLException
 	 */
-	void disposeAllStateForJob(String jobId, Connection con) throws SQLException;
+	void disposeAllStateForJob(String appId, Connection con) throws SQLException;
 
 	/**
 	 * Initialize the necessary tables for the given stateId. The state id

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index ef45a4b..ad5ec56 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -147,16 +147,20 @@ public class DbStateBackend extends StateBackend<DbStateBackend>
{
 					// We create a unique long id for each handle, but we also
 					// store the checkpoint id and timestamp for bookkeeping
 					long handleId = rnd.nextLong();
-					String jobIdShort = env.getJobID().toShortString();
+
+					// We use the ApplicationID here, because it is restored when
+					// the job is started from a savepoint (whereas the job ID
+					// changes with each submission).
+					String appIdShort = env.getApplicationID().toShortString();
 
 					byte[] serializedState = InstantiationUtil.serializeObject(state);
-					dbAdapter.setCheckpointInsertParams(jobIdShort, insertStatement,
+					dbAdapter.setCheckpointInsertParams(appIdShort, insertStatement,
 							checkpointID, timestamp, handleId,
 							serializedState);
 
 					insertStatement.executeUpdate();
 
-					return new DbStateHandle<S>(jobIdShort, checkpointID, timestamp, handleId,
+					return new DbStateHandle<S>(appIdShort, checkpointID, timestamp, handleId,
 							dbConfig, serializedState.length);
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -181,7 +185,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend>
{
 	public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName,
 			TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue)
throws IOException {
 		return new LazyDbKvState<K, V>(
-				stateId + "_" + env.getJobID().toShortString(),
+				stateId + "_" + env.getApplicationID().toShortString(),
 				env.getTaskInfo().getIndexOfThisSubtask() == 0,
 				getConnections(),
 				getConfiguration(),
@@ -211,8 +215,8 @@ public class DbStateBackend extends StateBackend<DbStateBackend>
{
 		if (nonPartitionedStateBackend == null) {
 			insertStatement = retry(new Callable<PreparedStatement>() {
 				public PreparedStatement call() throws SQLException {
-					dbAdapter.createCheckpointsTable(env.getJobID().toShortString(), getConnections().getFirst());
-					return dbAdapter.prepareCheckpointInsert(env.getJobID().toShortString(),
+					dbAdapter.createCheckpointsTable(env.getApplicationID().toShortString(), getConnections().getFirst());
+					return dbAdapter.prepareCheckpointInsert(env.getApplicationID().toShortString(),
 							getConnections().getFirst());
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -241,7 +245,7 @@ public class DbStateBackend extends StateBackend<DbStateBackend>
{
 	@Override
 	public void disposeAllStateForCurrentJob() throws Exception {
 		if (nonPartitionedStateBackend == null) {
-			dbAdapter.disposeAllStateForJob(env.getJobID().toShortString(), connections.getFirst());
+			dbAdapter.disposeAllStateForJob(env.getApplicationID().toShortString(), connections.getFirst());
 		} else {
 			nonPartitionedStateBackend.disposeAllStateForCurrentJob();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
index cc42b3f..8a4a266 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -38,7 +38,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S>
{
 	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class);
 
-	private final String jobId;
+	private final String appId;
 	private final DbBackendConfig dbConfig;
 
 	private final long checkpointId;
@@ -49,7 +49,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S>
{
 	private final long stateSize;
 
 	public DbStateHandle(
-			String jobId,
+			String appId,
 			long checkpointId,
 			long checkpointTs,
 			long handleId,
@@ -58,7 +58,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S>
{
 
 		this.checkpointId = checkpointId;
 		this.handleId = handleId;
-		this.jobId = jobId;
+		this.appId = appId;
 		this.dbConfig = dbConfig;
 		this.checkpointTs = checkpointTs;
 		this.stateSize = stateSize;
@@ -68,7 +68,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S>
{
 		return retry(new Callable<byte[]>() {
 			public byte[] call() throws Exception {
 				try (ShardedConnection con = dbConfig.createShardedConnection()) {
-					return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs,
handleId);
+					return dbConfig.getDbAdapter().getCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs,
handleId);
 				}
 			}
 		}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
@@ -80,7 +80,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S>
{
 			retry(new Callable<Boolean>() {
 				public Boolean call() throws Exception {
 					try (ShardedConnection con = dbConfig.createShardedConnection()) {
-						dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs,
handleId);
+						dbConfig.getDbAdapter().deleteCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs,
handleId);
 					}
 					return true;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
index 87a1f57..5d16be6 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java
@@ -57,7 +57,7 @@ public class LazyDbKvState<K, V> implements KvState<K, V, DbStateBackend>,
Check
 
 	// ------------------------------------------------------
 
-	// Unique id for this state (jobID_operatorID_stateName)
+	// Unique id for this state (appID_operatorID_stateName)
 	private final String kvStateId;
 	private final boolean compact;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
index 9eaa283..9eb3cd5 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -45,10 +45,10 @@ public class MySqlAdapter implements DbAdapter {
 	// -----------------------------------------------------------------------------
 
 	@Override
-	public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
+	public void createCheckpointsTable(String appId, Connection con) throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"CREATE TABLE IF NOT EXISTS checkpoints_" + jobId
+					"CREATE TABLE IF NOT EXISTS checkpoints_" + appId
 							+ " ("
 							+ "checkpointId bigint, "
 							+ "timestamp bigint, "
@@ -61,15 +61,15 @@ public class MySqlAdapter implements DbAdapter {
 	}
 
 	@Override
-	public PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException
{
+	public PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException
{
 		return con.prepareStatement(
-				"INSERT INTO checkpoints_" + jobId
+				"INSERT INTO checkpoints_" + appId
 						+ " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
 	}
 
 	@Override
-	public void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long
checkpointId,
-			long timestamp, long handleId, byte[] checkpoint) throws SQLException {
+	public void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long
checkpointId,
+										long timestamp, long handleId, byte[] checkpoint) throws SQLException {
 		insertStatement.setLong(1, checkpointId);
 		insertStatement.setLong(2, timestamp);
 		insertStatement.setLong(3, handleId);
@@ -77,11 +77,11 @@ public class MySqlAdapter implements DbAdapter {
 	}
 
 	@Override
-	public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs,
long handleId)
+	public byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs,
long handleId)
 			throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			ResultSet rs = smt.executeQuery(
-					"SELECT checkpoint FROM checkpoints_" + jobId
+					"SELECT checkpoint FROM checkpoints_" + appId
 							+ " WHERE handleId = " + handleId);
 			if (rs.next()) {
 				return rs.getBytes(1);
@@ -92,20 +92,20 @@ public class MySqlAdapter implements DbAdapter {
 	}
 
 	@Override
-	public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs,
long handleId)
+	public void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs,
long handleId)
 			throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"DELETE FROM checkpoints_" + jobId
+					"DELETE FROM checkpoints_" + appId
 							+ " WHERE handleId = " + handleId);
 		}
 	}
 
 	@Override
-	public void disposeAllStateForJob(String jobId, Connection con) throws SQLException {
+	public void disposeAllStateForJob(String appId, Connection con) throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"DROP TABLE checkpoints_" + jobId);
+					"DROP TABLE checkpoints_" + appId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 6664b6d..155ced8 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -133,11 +133,11 @@ public class DbStateBackendTest {
 
 		assertNotNull(backend.getConnections());
 		assertTrue(
-				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
 
 		backend.disposeAllStateForCurrentJob();
 		assertFalse(
-				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
 		backend.close();
 
 		assertTrue(backend.getConnections().getFirst().isClosed());
@@ -167,12 +167,12 @@ public class DbStateBackendTest {
 		assertEquals(state2, handle2.getState(getClass().getClassLoader()));
 		handle2.discardState();
 
-		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
+		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
 
 		assertEquals(state3, handle3.getState(getClass().getClassLoader()));
 		handle3.discardState();
 
-		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getJobID().toShortString()));
+		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + env.getApplicationID().toShortString()));
 
 		backend.close();
 
@@ -196,7 +196,7 @@ public class DbStateBackendTest {
 			LazyDbKvState<Integer, String> kv = backend.createKvState("state1_1", "state1",
IntSerializer.INSTANCE,
 					StringSerializer.INSTANCE, null);
 
-			String tableName = "state1_1_" + env.getJobID().toShortString();
+			String tableName = "state1_1_" + env.getApplicationID().toShortString();
 			assertTrue(isTableCreated(backend.getConnections().getFirst(), tableName));
 
 			assertEquals(0, kv.size());
@@ -324,7 +324,7 @@ public class DbStateBackendTest {
 
 		Environment env = new DummyEnvironment("test", 2, 0);
 
-		String tableName = "state1_1_" + env.getJobID().toShortString();
+		String tableName = "state1_1_" + env.getApplicationID().toShortString();
 		assertFalse(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName));
 		assertFalse(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77348858/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
index 1f13f4b..02c1a3e 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -42,11 +42,11 @@ public class DerbyAdapter extends MySqlAdapter {
 	 * "IF NOT EXISTS" clause at table creation
 	 */
 	@Override
-	public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
+	public void createCheckpointsTable(String appId, Connection con) throws SQLException {
 
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"CREATE TABLE checkpoints_" + jobId
+					"CREATE TABLE checkpoints_" + appId
 							+ " ("
 							+ "checkpointId bigint, "
 							+ "timestamp bigint, "


Mime
View raw message