flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [FLINK-3299] Remove ApplicationID from Environment
Date Wed, 17 Feb 2016 13:46:51 GMT
[FLINK-3299] Remove ApplicationID from Environment

This closes #1642.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50a166df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50a166df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50a166df

Branch: refs/heads/master
Commit: 50a166df3e96d4783a5ee35aefe07cfaea5927e6
Parents: e08d7a6
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Feb 11 16:04:08 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Feb 17 14:46:04 2016 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/DbAdapter.java      |  29 +++--
 .../contrib/streaming/state/DbStateBackend.java |  14 +--
 .../contrib/streaming/state/DbStateHandle.java  |  10 +-
 .../contrib/streaming/state/MySqlAdapter.java   |  22 ++--
 .../streaming/state/DbStateBackendTest.java     |  12 +-
 .../contrib/streaming/state/DerbyAdapter.java   |   4 +-
 .../apache/flink/api/common/ApplicationID.java  |  43 -------
 .../flink/runtime/checkpoint/Savepoint.java     |  57 ---------
 .../checkpoint/SavepointCoordinator.java        |  32 ++---
 .../runtime/checkpoint/SavepointStore.java      |  18 +--
 .../checkpoint/SavepointStoreFactory.java       |   4 +-
 .../deployment/TaskDeploymentDescriptor.java    |  18 +--
 .../flink/runtime/execution/Environment.java    |  11 --
 .../runtime/executiongraph/ExecutionGraph.java  |  23 +---
 .../executiongraph/ExecutionJobVertex.java      |   5 -
 .../runtime/executiongraph/ExecutionVertex.java |   7 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |   9 --
 .../apache/flink/runtime/taskmanager/Task.java  |   8 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   2 +-
 .../checkpoint/SavepointCoordinatorTest.java    | 125 +++++--------------
 .../checkpoint/SavepointStoreFactoryTest.java   |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   5 +-
 .../operators/testutils/DummyEnvironment.java   |   7 --
 .../operators/testutils/MockEnvironment.java    |   8 --
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  32 +++--
 .../flink/runtime/taskmanager/TaskStopTest.java |   8 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   3 +-
 .../TestingJobManagerMessages.scala             |   7 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   8 --
 .../streaming/runtime/tasks/StreamTaskTest.java |   3 +-
 .../test/checkpointing/SavepointITCase.java     |  25 ++--
 33 files changed, 136 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
index 491451a..9ab9c23 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbAdapter.java
@@ -34,10 +34,9 @@ public interface DbAdapter extends Serializable {
 
 	/**
 	 * Initialize tables for storing non-partitioned checkpoints for the given
-	 * application id and database connection.
-	 * 
+	 * job id and database connection.
 	 */
-	void createCheckpointsTable(String appId, Connection con) throws SQLException;
+	void createCheckpointsTable(String jobId, Connection con) throws SQLException;
 
 	/**
 	 * Checkpoints will be inserted in the database using prepared statements.
@@ -45,14 +44,14 @@ public interface DbAdapter extends Serializable {
 	 * later to insert using the given connection.
 	 * 
 	 */
-	PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException;
+	PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException;
 
 	/**
 	 * Set the {@link PreparedStatement} parameters for the statement returned
 	 * by {@link #prepareCheckpointInsert(String, Connection)}.
 	 * 
-	 * @param appId
-	 *            Id of the current application.
+	 * @param jobId
+	 *            Id of the current job.
 	 * @param insertStatement
 	 *            Statement returned by
 	 *            {@link #prepareCheckpointInsert(String, Connection)}.
@@ -67,14 +66,14 @@ public interface DbAdapter extends Serializable {
 	 *            The serialized checkpoint.
 	 * @throws SQLException
 	 */
-	void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long checkpointId,
+	void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
 			long timestamp, long handleId, byte[] checkpoint) throws SQLException;
 
 	/**
 	 * Retrieve the serialized checkpoint data from the database.
 	 * 
-	 * @param appId
-	 *            Id of the current application.
+	 * @param jobId
+	 *            Id of the current job.
 	 * @param con
 	 *            Database connection
 	 * @param checkpointId
@@ -87,14 +86,14 @@ public interface DbAdapter extends Serializable {
 	 * @return The byte[] corresponding to the checkpoint or null if missing.
 	 * @throws SQLException
 	 */
-	byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
+	byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
 			throws SQLException;
 
 	/**
 	 * Remove the given checkpoint from the database.
 	 * 
-	 * @param appId
-	 *            Id of the current application.
+	 * @param jobId
+	 *            Id of the current job.
 	 * @param con
 	 *            Database connection
 	 * @param checkpointId
@@ -107,16 +106,16 @@ public interface DbAdapter extends Serializable {
 	 * @return The byte[] corresponding to the checkpoint or null if missing.
 	 * @throws SQLException
 	 */
-	void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
+	void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
 			throws SQLException;
 
 	/**
-	 * Remove all states for the given {@link org.apache.flink.api.common.ApplicationID},
+	 * Remove all states for the given {@link org.apache.flink.api.common.JobID},
 	 * by for instance dropping the entire table.
 	 * 
 	 * @throws SQLException
 	 */
-	void disposeAllStateForJob(String appId, Connection con) throws SQLException;
+	void disposeAllStateForJob(String jobId, Connection con) throws SQLException;
 
 	/**
 	 * Initialize the necessary tables for the given stateId. The state id

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index d82bfb2..964c687 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -79,7 +79,7 @@ public class DbStateBackend extends AbstractStateBackend {
 
 	private transient Environment env;
 
-	private transient String appId;
+	private transient String jobId;
 
 	// ------------------------------------------------------
 
@@ -165,13 +165,13 @@ public class DbStateBackend extends AbstractStateBackend {
 					long handleId = rnd.nextLong();
 
 					byte[] serializedState = InstantiationUtil.serializeObject(state);
-					dbAdapter.setCheckpointInsertParams(appId, insertStatement,
+					dbAdapter.setCheckpointInsertParams(jobId, insertStatement,
 							checkpointID, timestamp, handleId,
 							serializedState);
 
 					insertStatement.executeUpdate();
 
-					return new DbStateHandle<>(appId, checkpointID, timestamp, handleId,
+					return new DbStateHandle<>(jobId, checkpointID, timestamp, handleId,
 							dbConfig, serializedState.length);
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -268,7 +268,7 @@ public class DbStateBackend extends AbstractStateBackend {
 
 		this.rnd = new Random();
 		this.env = env;
-		this.appId = env.getApplicationID().toString().substring(0, 16);
+		this.jobId = env.getJobID().toString().substring(0, 16);
 
 		connections = dbConfig.createShardedConnection();
 
@@ -286,8 +286,8 @@ public class DbStateBackend extends AbstractStateBackend {
 		if (nonPartitionedStateBackend == null) {
 			insertStatement = retry(new Callable<PreparedStatement>() {
 				public PreparedStatement call() throws SQLException {
-					dbAdapter.createCheckpointsTable(appId, getConnections().getFirst());
-					return dbAdapter.prepareCheckpointInsert(appId,
+					dbAdapter.createCheckpointsTable(jobId, getConnections().getFirst());
+					return dbAdapter.prepareCheckpointInsert(jobId,
 							getConnections().getFirst());
 				}
 			}, numSqlRetries, sqlRetrySleep);
@@ -316,7 +316,7 @@ public class DbStateBackend extends AbstractStateBackend {
 	@Override
 	public void disposeAllStateForCurrentJob() throws Exception {
 		if (nonPartitionedStateBackend == null) {
-			dbAdapter.disposeAllStateForJob(appId, connections.getFirst());
+			dbAdapter.disposeAllStateForJob(jobId, connections.getFirst());
 		} else {
 			nonPartitionedStateBackend.disposeAllStateForCurrentJob();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
index 8a4a266..cc42b3f 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateHandle.java
@@ -38,7 +38,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(DbStateHandle.class);
 
-	private final String appId;
+	private final String jobId;
 	private final DbBackendConfig dbConfig;
 
 	private final long checkpointId;
@@ -49,7 +49,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 	private final long stateSize;
 
 	public DbStateHandle(
-			String appId,
+			String jobId,
 			long checkpointId,
 			long checkpointTs,
 			long handleId,
@@ -58,7 +58,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 
 		this.checkpointId = checkpointId;
 		this.handleId = handleId;
-		this.appId = appId;
+		this.jobId = jobId;
 		this.dbConfig = dbConfig;
 		this.checkpointTs = checkpointTs;
 		this.stateSize = stateSize;
@@ -68,7 +68,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 		return retry(new Callable<byte[]>() {
 			public byte[] call() throws Exception {
 				try (ShardedConnection con = dbConfig.createShardedConnection()) {
-					return dbConfig.getDbAdapter().getCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs, handleId);
+					return dbConfig.getDbAdapter().getCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
 				}
 			}
 		}, dbConfig.getMaxNumberOfSqlRetries(), dbConfig.getSleepBetweenSqlRetries());
@@ -80,7 +80,7 @@ public class DbStateHandle<S> implements Serializable, StateHandle<S> {
 			retry(new Callable<Boolean>() {
 				public Boolean call() throws Exception {
 					try (ShardedConnection con = dbConfig.createShardedConnection()) {
-						dbConfig.getDbAdapter().deleteCheckpoint(appId, con.getFirst(), checkpointId, checkpointTs, handleId);
+						dbConfig.getDbAdapter().deleteCheckpoint(jobId, con.getFirst(), checkpointId, checkpointTs, handleId);
 					}
 					return true;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
index cf2b5be..7c71b51 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/MySqlAdapter.java
@@ -45,10 +45,10 @@ public class MySqlAdapter implements DbAdapter {
 	// -----------------------------------------------------------------------------
 
 	@Override
-	public void createCheckpointsTable(String appId, Connection con) throws SQLException {
+	public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"CREATE TABLE IF NOT EXISTS checkpoints_" + appId
+					"CREATE TABLE IF NOT EXISTS checkpoints_" + jobId
 							+ " ("
 							+ "checkpointId bigint, "
 							+ "timestamp bigint, "
@@ -61,14 +61,14 @@ public class MySqlAdapter implements DbAdapter {
 	}
 
 	@Override
-	public PreparedStatement prepareCheckpointInsert(String appId, Connection con) throws SQLException {
+	public PreparedStatement prepareCheckpointInsert(String jobId, Connection con) throws SQLException {
 		return con.prepareStatement(
-				"INSERT INTO checkpoints_" + appId
+				"INSERT INTO checkpoints_" + jobId
 						+ " (checkpointId, timestamp, handleId, checkpoint) VALUES (?,?,?,?)");
 	}
 
 	@Override
-	public void setCheckpointInsertParams(String appId, PreparedStatement insertStatement, long checkpointId,
+	public void setCheckpointInsertParams(String jobId, PreparedStatement insertStatement, long checkpointId,
 										long timestamp, long handleId, byte[] checkpoint) throws SQLException {
 		insertStatement.setLong(1, checkpointId);
 		insertStatement.setLong(2, timestamp);
@@ -77,11 +77,11 @@ public class MySqlAdapter implements DbAdapter {
 	}
 
 	@Override
-	public byte[] getCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
+	public byte[] getCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
 			throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			ResultSet rs = smt.executeQuery(
-					"SELECT checkpoint FROM checkpoints_" + appId
+					"SELECT checkpoint FROM checkpoints_" + jobId
 							+ " WHERE handleId = " + handleId);
 			if (rs.next()) {
 				return rs.getBytes(1);
@@ -92,20 +92,20 @@ public class MySqlAdapter implements DbAdapter {
 	}
 
 	@Override
-	public void deleteCheckpoint(String appId, Connection con, long checkpointId, long checkpointTs, long handleId)
+	public void deleteCheckpoint(String jobId, Connection con, long checkpointId, long checkpointTs, long handleId)
 			throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"DELETE FROM checkpoints_" + appId
+					"DELETE FROM checkpoints_" + jobId
 							+ " WHERE handleId = " + handleId);
 		}
 	}
 
 	@Override
-	public void disposeAllStateForJob(String appId, Connection con) throws SQLException {
+	public void disposeAllStateForJob(String jobId, Connection con) throws SQLException {
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"DROP TABLE checkpoints_" + appId);
+					"DROP TABLE checkpoints_" + jobId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
index 91375e4..adef7db 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -136,15 +136,15 @@ public class DbStateBackendTest {
 
 		Environment env = new DummyEnvironment("test", 1, 0);
 		backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE);
-		String appId = env.getApplicationID().toString().substring(0, 16);
+		String jobId = env.getJobID().toString().substring(0, 16);
 
 		assertNotNull(backend.getConnections());
 		assertTrue(
-				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + jobId));
 
 		backend.disposeAllStateForCurrentJob();
 		assertFalse(
-				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + appId));
+				isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + jobId));
 		backend.close();
 
 		assertTrue(backend.getConnections().getFirst().isClosed());
@@ -154,7 +154,7 @@ public class DbStateBackendTest {
 	public void testSerializableState() throws Exception {
 		Environment env = new DummyEnvironment("test", 1, 0);
 		DbStateBackend backend = new DbStateBackend(conf);
-		String appId = env.getApplicationID().toString().substring(0, 16);
+		String jobId = env.getJobID().toString().substring(0, 16);
 
 		backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE);
 
@@ -175,12 +175,12 @@ public class DbStateBackendTest {
 		assertEquals(state2, handle2.getState(getClass().getClassLoader()));
 		handle2.discardState();
 
-		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));
+		assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + jobId));
 
 		assertEquals(state3, handle3.getState(getClass().getClassLoader()));
 		handle3.discardState();
 
-		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + appId));
+		assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + jobId));
 
 		backend.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
index 53d8d50..5331956 100644
--- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -42,11 +42,11 @@ public class DerbyAdapter extends MySqlAdapter {
 	 * "IF NOT EXISTS" clause at table creation
 	 */
 	@Override
-	public void createCheckpointsTable(String appId, Connection con) throws SQLException {
+	public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
 
 		try (Statement smt = con.createStatement()) {
 			smt.executeUpdate(
-					"CREATE TABLE checkpoints_" + appId
+					"CREATE TABLE checkpoints_" + jobId
 							+ " ("
 							+ "checkpointId bigint, "
 							+ "timestamp bigint, "

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java b/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java
deleted file mode 100644
index c047dfe..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/ApplicationID.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.AbstractID;
-
-/**
- * Identifier for a Flink application.
- *
- * <p>This is used to as a identifier across job submissions, which have changing job IDs (for
- * example after resuming an application). This is set for the execution graph.
- */
-@PublicEvolving
-public final class ApplicationID extends AbstractID {
-
-	private static final long serialVersionUID = 1L;
-
-	public ApplicationID() {
-		super();
-	}
-
-	public ApplicationID(byte[] bytes) {
-		super(bytes);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Savepoint.java
deleted file mode 100644
index 96e8950..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Savepoint.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.ApplicationID;
-
-import java.io.Serializable;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A {@link CompletedCheckpoint} instance with the {@link ApplicationID} of the program it belongs
- * to.
- */
-public class Savepoint implements Serializable {
-
-	private static final long serialVersionUID = 840132134745425068L;
-
-	private final ApplicationID appId;
-
-	private final CompletedCheckpoint completedCheckpoint;
-
-	public Savepoint(ApplicationID appId, CompletedCheckpoint completedCheckpoint) {
-
-		this.appId = checkNotNull(appId);
-		this.completedCheckpoint = checkNotNull(completedCheckpoint);
-	}
-
-	public ApplicationID getApplicationId() {
-		return appId;
-	}
-
-	public CompletedCheckpoint getCompletedCheckpoint() {
-		return completedCheckpoint;
-	}
-
-	@Override
-	public String toString() {
-		return "Savepoint (appId=" + appId + ", checkpoint=" + completedCheckpoint + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index ea4b8ae..5638e78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -67,20 +66,13 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointCoordinator.class);
 
-	/**
-	 * The application ID of the job this coordinator belongs to. This is updated on reset to an
-	 * old savepoint.
-	 */
-	private ApplicationID appId;
-
 	/** Store for savepoints. */
-	private StateStore<Savepoint> savepointStore;
+	private StateStore<CompletedCheckpoint> savepointStore;
 
 	/** Mapping from checkpoint ID to promises for savepoints. */
 	private final Map<Long, Promise<String>> savepointPromises;
 
 	public SavepointCoordinator(
-			ApplicationID appId,
 			JobID jobId,
 			long baseInterval,
 			long checkpointTimeout,
@@ -89,7 +81,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 			ExecutionVertex[] tasksToCommitTo,
 			ClassLoader userClassLoader,
 			CheckpointIDCounter checkpointIDCounter,
-			StateStore<Savepoint> savepointStore,
+			StateStore<CompletedCheckpoint> savepointStore,
 			CheckpointStatsTracker statsTracker) throws Exception {
 
 		super(jobId,
@@ -106,7 +98,6 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 				RecoveryMode.STANDALONE,
 				statsTracker);
 
-		this.appId = checkNotNull(appId);
 		this.savepointStore = checkNotNull(savepointStore);
 		this.savepointPromises = new ConcurrentHashMap<>();
 	}
@@ -169,12 +160,11 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 	 *
 	 * @param tasks         Tasks that will possibly be reset
 	 * @param savepointPath The path of the savepoint to rollback to
-	 * @return The application ID of the rolled back savepoint
 	 * @throws IllegalStateException If coordinator is shut down
 	 * @throws IllegalStateException If mismatch between program and savepoint state
 	 * @throws Exception             If savepoint store failure
 	 */
-	public ApplicationID restoreSavepoint(
+	public void restoreSavepoint(
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			String savepointPath) throws Exception {
 
@@ -189,9 +179,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 
 			LOG.info("Rolling back to savepoint '{}'.", savepointPath);
 
-			Savepoint savepoint = savepointStore.getState(savepointPath);
-
-			CompletedCheckpoint checkpoint = savepoint.getCompletedCheckpoint();
+			CompletedCheckpoint checkpoint = savepointStore.getState(savepointPath);
 
 			LOG.info("Savepoint: {}@{}", checkpoint.getCheckpointID(), checkpoint.getTimestamp());
 
@@ -207,7 +195,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 					String msg = String.format("Failed to rollback to savepoint %s. " +
 							"Cannot map old state for task %s to the new program. " +
 							"This indicates that the program has been changed in a " +
-							"non-compatible way  after the savepoint.", savepoint,
+							"non-compatible way  after the savepoint.", checkpoint,
 							state.getOperatorId());
 					throw new IllegalStateException(msg);
 				}
@@ -217,7 +205,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 							"Parallelism mismatch between savepoint state and new program. " +
 							"Cannot map subtask %d of operator %s to new program with " +
 							"parallelism %d. This indicates that the program has been changed " +
-							"in a non-compatible way after the savepoint.", savepoint,
+							"in a non-compatible way after the savepoint.", checkpoint,
 							state.getSubtask(), state.getOperatorId(), vertex.getParallelism());
 					throw new IllegalStateException(msg);
 				}
@@ -233,11 +221,6 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 			checkpointIdCounter.start();
 			checkpointIdCounter.setCount(nextCheckpointId + 1);
 			LOG.info("Reset the checkpoint ID to {}", nextCheckpointId);
-
-			this.appId = savepoint.getApplicationId();
-			LOG.info("Reset the application ID to {}", appId);
-
-			return appId;
 		}
 	}
 
@@ -276,8 +259,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 
 		try {
 			// Save the checkpoint
-			String savepointPath = savepointStore.putState(
-					new Savepoint(appId, checkpoint));
+			String savepointPath = savepointStore.putState(checkpoint);
 			promise.success(savepointPath);
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStore.java
index 001674a..4bdbee0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStore.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.checkpoint;
 /**
  * Simple wrapper around the state store for savepoints.
  */
-public class SavepointStore implements StateStore<Savepoint> {
+public class SavepointStore implements StateStore<CompletedCheckpoint> {
 
-	private final StateStore<Savepoint> stateStore;
+	private final StateStore<CompletedCheckpoint> stateStore;
 
-	public SavepointStore(StateStore<Savepoint> stateStore) {
+	public SavepointStore(StateStore<CompletedCheckpoint> stateStore) {
 		this.stateStore = stateStore;
 	}
 
@@ -34,10 +34,10 @@ public class SavepointStore implements StateStore<Savepoint> {
 
 	public void stop() {
 		if (stateStore instanceof HeapStateStore) {
-			HeapStateStore<Savepoint> heapStateStore = (HeapStateStore<Savepoint>) stateStore;
+			HeapStateStore<CompletedCheckpoint> heapStateStore = (HeapStateStore<CompletedCheckpoint>) stateStore;
 
-			for (Savepoint savepoint : heapStateStore.getAll()) {
-				savepoint.getCompletedCheckpoint().discard(ClassLoader.getSystemClassLoader());
+			for (CompletedCheckpoint savepoint : heapStateStore.getAll()) {
+				savepoint.discard(ClassLoader.getSystemClassLoader());
 			}
 
 			heapStateStore.clearAll();
@@ -45,12 +45,12 @@ public class SavepointStore implements StateStore<Savepoint> {
 	}
 
 	@Override
-	public String putState(Savepoint state) throws Exception {
+	public String putState(CompletedCheckpoint state) throws Exception {
 		return stateStore.putState(state);
 	}
 
 	@Override
-	public Savepoint getState(String path) throws Exception {
+	public CompletedCheckpoint getState(String path) throws Exception {
 		return stateStore.getState(path);
 	}
 
@@ -59,7 +59,7 @@ public class SavepointStore implements StateStore<Savepoint> {
 		stateStore.disposeState(path);
 	}
 
-	StateStore<Savepoint> getStateStore() {
+	StateStore<CompletedCheckpoint> getStateStore() {
 		return stateStore;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
index 2e14971..04a3227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactory.java
@@ -109,11 +109,11 @@ public class SavepointStoreFactory {
 	// ------------------------------------------------------------------------
 
 	private static SavepointStore createJobManagerSavepointStore() {
-		return new SavepointStore(new HeapStateStore<Savepoint>());
+		return new SavepointStore(new HeapStateStore<CompletedCheckpoint>());
 	}
 
 	private static SavepointStore createFileSystemSavepointStore(String rootPath) throws IOException {
-		return new SavepointStore(new FileSystemStateStore<Savepoint>(rootPath, "savepoint-"));
+		return new SavepointStore(new FileSystemStateStore<CompletedCheckpoint>(rootPath, "savepoint-"));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 912a0ce..983ad38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.deployment;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -43,9 +42,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	private static final long serialVersionUID = -3233562176034358530L;
 
-	/** The ID of the application the tasks belongs to. */
-	private final ApplicationID appId;
-
 	/** The ID of the job the tasks belongs to. */
 	private final JobID jobID;
 
@@ -98,7 +94,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 * Constructs a task deployment descriptor.
 	 */
 	public TaskDeploymentDescriptor(
-			ApplicationID appId, JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId,
+			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId,
 			String taskName, int indexInSubtaskGroup, int numberOfSubtasks, int attemptNumber,
 			Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName,
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
@@ -112,7 +108,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		checkArgument(targetSlotNumber >= 0);
 		checkArgument(attemptNumber >= 0);
 
-		this.appId = checkNotNull(appId);
 		this.jobID = checkNotNull(jobID);
 		this.vertexID = checkNotNull(vertexID);
 		this.executionId = checkNotNull(executionId);
@@ -133,7 +128,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	}
 
 	public TaskDeploymentDescriptor(
-			ApplicationID appId, JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId,
+			JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId,
 			String taskName, int indexInSubtaskGroup, int numberOfSubtasks, int attemptNumber,
 			Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName,
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
@@ -141,19 +136,12 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
 			int targetSlotNumber) {
 
-		this(appId, jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
+		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
 				inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1);
 	}
 
 	/**
-	 * Returns the ID of the application the tasks belongs to.
-	 */
-	public ApplicationID getApplicationID() {
-		return appId;
-	}
-
-	/**
 	 * Returns the ID of the job the tasks belongs to.
 	 */
 	public JobID getJobID() {

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index fef7d0e..7332151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.execution;
 
 import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -47,16 +46,6 @@ import java.util.concurrent.Future;
 public interface Environment {
 
 	/**
-	 * Returns the ID of the application the task belongs to.
-	 *
-	 * <p>This ID stays the same across job submissions after resuming an application from a
-	 * savepoint.
-	 *
-	 * @return The ID of the application the task belongs to
-	 */
-	ApplicationID getApplicationID();
-
-	/**
 	 * Returns the ID of the job that the task belongs to.
 	 *
 	 * @return the ID of the job from the original job graph

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7d83ae2..9a6eb85 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorSystem;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -34,8 +33,8 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.SavepointCoordinator;
 import org.apache.flink.runtime.checkpoint.StateStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
@@ -131,11 +130,6 @@ public class ExecutionGraph implements Serializable {
 	 * within the job. */
 	private final SerializableObject progressLock = new SerializableObject();
 
-	/** The ID of the application this graph has been built for. This is
-	 * generated when the graph is created and reset if necessary (currently
-	 * only after {@link #restoreSavepoint(String)}). */
-	private ApplicationID appId = new ApplicationID();
-
 	/** The ID of the job this graph has been built for. */
 	private final JobID jobID;
 
@@ -359,7 +353,7 @@ public class ExecutionGraph implements Serializable {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			RecoveryMode recoveryMode,
-			StateStore<Savepoint> savepointStore) throws Exception {
+			StateStore<CompletedCheckpoint> savepointStore) throws Exception {
 
 		// simple sanity checks
 		if (interval < 10 || checkpointTimeout < 10) {
@@ -414,7 +408,6 @@ public class ExecutionGraph implements Serializable {
 
 		// Savepoint Coordinator
 		savepointCoordinator = new SavepointCoordinator(
-				appId,
 				jobID,
 				interval,
 				checkpointTimeout,
@@ -525,10 +518,6 @@ public class ExecutionGraph implements Serializable {
 		return scheduler;
 	}
 
-	public ApplicationID getApplicationID() {
-		return appId;
-	}
-
 	public JobID getJobID() {
 		return jobID;
 	}
@@ -916,9 +905,6 @@ public class ExecutionGraph implements Serializable {
 	 * this method. The operation might block. Make sure that calls don't block the job manager
 	 * actor.
 	 *
-	 * <p><strong>Note</strong>: a call to this method changes the {@link #appId} of the execution
-	 * graph if the operation is successful.
-	 *
 	 * @param savepointPath The path of the savepoint to rollback to.
 	 * @throws IllegalStateException If checkpointing is disabled
 	 * @throws IllegalStateException If checkpoint coordinator is shut down
@@ -929,11 +915,8 @@ public class ExecutionGraph implements Serializable {
 			if (savepointCoordinator != null) {
 				LOG.info("Restoring savepoint: " + savepointPath + ".");
 
-				ApplicationID oldAppId = appId;
-				this.appId = savepointCoordinator.restoreSavepoint(
+				savepointCoordinator.restoreSavepoint(
 						getAllVertices(), savepointPath);
-
-				LOG.info("Set application ID to {} (from: {}).", appId, oldAppId);
 			}
 			else {
 				// Sanity check

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index bc368ab..7d4be79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.LongCounter;
@@ -188,10 +187,6 @@ public class ExecutionJobVertex implements Serializable {
 		return parallelism;
 	}
 
-	public ApplicationID getApplicationID() {
-		return graph.getApplicationID();
-	}
-
 	public JobID getJobId() {
 		return graph.getJobID();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e522c8b..f2d30b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -155,10 +154,6 @@ public class ExecutionVertex implements Serializable {
 	//  Properties
 	// --------------------------------------------------------------------------------------------
 
-	public ApplicationID getApplicationId() {
-		return this.jobVertex.getApplicationID();
-	}
-
 	public JobID getJobId() {
 		return this.jobVertex.getJobId();
 	}
@@ -673,7 +668,7 @@ public class ExecutionVertex implements Serializable {
 		List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
 		List<URL> classpaths = getExecutionGraph().getRequiredClasspaths();
 
-		return new TaskDeploymentDescriptor(getApplicationId(), getJobId(), getJobvertexId(), executionId, getTaskName(),
+		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
 				subTaskIndex, getTotalNumberOfParallelSubtasks(), attemptNumber, getExecutionGraph().getJobConfiguration(),
 				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
 				producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 0b33942..0fddde4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -48,7 +47,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 public class RuntimeEnvironment implements Environment {
 
-	private final ApplicationID appId;
 	private final JobID jobId;
 	private final JobVertexID jobVertexId;
 	private final ExecutionAttemptID executionId;
@@ -79,7 +77,6 @@ public class RuntimeEnvironment implements Environment {
 	// ------------------------------------------------------------------------
 
 	public RuntimeEnvironment(
-			ApplicationID appId,
 			JobID jobId,
 			JobVertexID jobVertexId,
 			ExecutionAttemptID executionId,
@@ -98,7 +95,6 @@ public class RuntimeEnvironment implements Environment {
 			ActorGateway jobManager,
 			TaskManagerRuntimeInfo taskManagerInfo) {
 
-		this.appId = checkNotNull(appId);
 		this.jobId = checkNotNull(jobId);
 		this.jobVertexId = checkNotNull(jobVertexId);
 		this.executionId = checkNotNull(executionId);
@@ -121,11 +117,6 @@ public class RuntimeEnvironment implements Environment {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ApplicationID getApplicationID() {
-		return appId;
-	}
-
-	@Override
 	public JobID getJobID() {
 		return jobId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 81dc01f..f2d6025 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
@@ -116,9 +114,6 @@ public class Task implements Runnable {
 	//  Constant fields that are part of the initial Task construction
 	// ------------------------------------------------------------------------
 
-	/** The application that the task belongs to */
-	private final ApplicationID appId;
-
 	/** The job that the task belongs to */
 	private final JobID jobId;
 
@@ -239,7 +234,6 @@ public class Task implements Runnable {
 				TaskManagerRuntimeInfo taskManagerConfig)
 	{
 		this.taskInfo = checkNotNull(tdd.getTaskInfo());
-		this.appId = checkNotNull(tdd.getApplicationID());
 		this.jobId = checkNotNull(tdd.getJobID());
 		this.vertexId = checkNotNull(tdd.getVertexID());
 		this.executionId  = checkNotNull(tdd.getExecutionId());
@@ -499,7 +493,7 @@ public class Task implements Runnable {
 			TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager,
 					jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout);
 
-			Environment env = new RuntimeEnvironment(appId, jobId, vertexId, executionId, taskInfo,
+			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, taskInfo,
 					jobConfiguration, taskConfiguration,
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1a16225..f3b3883 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -632,7 +632,7 @@ class JobManager(
           log.debug(s"$savepoint")
 
           // Discard the associated checkpoint
-          savepoint.getCompletedCheckpoint.discard(getClass.getClassLoader)
+          savepoint.discard(getClass.getClassLoader)
 
           // Dispose the savepoint
           savepointStore.disposeState(savepointPath)

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 26a626e..5d83bf2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -70,7 +70,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()),
 					RecoveryMode.STANDALONE,
-					new HeapStateStore<Savepoint>());
+					new HeapStateStore<CompletedCheckpoint>());
 
 			CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 			SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
index 6bbdf62..b3bdd9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -82,7 +81,6 @@ public class SavepointCoordinatorTest {
 	 */
 	@Test
 	public void testSimpleTriggerSavepoint() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		long checkpointTimeout = 60 * 1000;
 		long timestamp = 1272635;
@@ -90,10 +88,9 @@ public class SavepointCoordinatorTest {
 				mockExecutionVertex(jobId),
 				mockExecutionVertex(jobId) };
 		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapStateStore<Savepoint> savepointStore = new HeapStateStore<>();
+		HeapStateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				checkpointTimeout,
 				vertices,
@@ -141,8 +138,8 @@ public class SavepointCoordinatorTest {
 		String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero());
 
 		// Verify the savepoint
-		Savepoint savepoint = savepointStore.getState(savepointPath);
-		verifySavepoint(savepoint, appId, jobId, checkpointId, timestamp,
+		CompletedCheckpoint savepoint = savepointStore.getState(savepointPath);
+		verifySavepoint(savepoint, jobId, checkpointId, timestamp,
 				vertices);
 
 		// Verify all promises removed
@@ -158,7 +155,6 @@ public class SavepointCoordinatorTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testSimpleRollbackSavepoint() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 
 		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
@@ -176,10 +172,9 @@ public class SavepointCoordinatorTest {
 		}
 
 		MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter();
-		StateStore<Savepoint> savepointStore = new HeapStateStore<>();
+		StateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				triggerVertices,
@@ -201,9 +196,7 @@ public class SavepointCoordinatorTest {
 		assertNotNull(savepointPath);
 
 		// Rollback
-		assertEquals(appId, coordinator.restoreSavepoint(
-				createExecutionJobVertexMap(jobVertices),
-				savepointPath));
+		coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath);
 
 		// Verify all executions have been reset
 		for (ExecutionVertex vertex : ackVertices) {
@@ -222,7 +215,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testRollbackParallelismMismatch() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 
 		ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] {
@@ -239,10 +231,9 @@ public class SavepointCoordinatorTest {
 			}
 		}
 
-		StateStore<Savepoint> savepointStore = new HeapStateStore<>();
+		StateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				triggerVertices,
@@ -287,14 +278,12 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testRollbackStateStoreFailure() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
-		HeapStateStore<Savepoint> savepointStore = spy(
-				new HeapStateStore<Savepoint>());
+		HeapStateStore<CompletedCheckpoint> savepointStore = spy(
+				new HeapStateStore<CompletedCheckpoint>());
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				jobVertex.getTaskVertices(),
@@ -336,50 +325,17 @@ public class SavepointCoordinatorTest {
 	}
 
 	@Test
-	public void testRollbackUpdatesApplicationID() throws Exception {
-		ApplicationID appId = new ApplicationID();
-
-		CompletedCheckpoint checkpoint = mock(CompletedCheckpoint.class);
-		when(checkpoint.getStates()).thenReturn(Collections.<StateForTask>emptyList());
-		when(checkpoint.getCheckpointID()).thenReturn(12312312L);
-
-		Savepoint savepoint = new Savepoint(appId, checkpoint);
-
-		StateStore<Savepoint> savepointStore = mock(StateStore.class);
-		when(savepointStore.getState(anyString())).thenReturn(savepoint);
-
-		SavepointCoordinator coordinator = createSavepointCoordinator(
-				new ApplicationID(),
-				new JobID(),
-				60 * 1000,
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
-				savepointStore);
-
-		assertEquals(appId, coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any"));
-
-		coordinator.shutdown();
-	}
-
-	@Test
 	public void testRollbackSetsCheckpointID() throws Exception {
-		ApplicationID appId = new ApplicationID();
-
-		CompletedCheckpoint checkpoint = mock(CompletedCheckpoint.class);
-		when(checkpoint.getStates()).thenReturn(Collections.<StateForTask>emptyList());
-		when(checkpoint.getCheckpointID()).thenReturn(12312312L);
-
-		Savepoint savepoint = new Savepoint(appId, checkpoint);
+		CompletedCheckpoint savepoint = mock(CompletedCheckpoint.class);
+		when(savepoint.getStates()).thenReturn(Collections.<StateForTask>emptyList());
+		when(savepoint.getCheckpointID()).thenReturn(12312312L);
 
 		CheckpointIDCounter checkpointIdCounter = mock(CheckpointIDCounter.class);
 
-		StateStore<Savepoint> savepointStore = mock(StateStore.class);
+		StateStore<CompletedCheckpoint> savepointStore = mock(StateStore.class);
 		when(savepointStore.getState(anyString())).thenReturn(savepoint);
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				new ApplicationID(),
 				new JobID(),
 				60 * 1000,
 				new ExecutionVertex[] {},
@@ -388,7 +344,7 @@ public class SavepointCoordinatorTest {
 				checkpointIdCounter,
 				savepointStore);
 
-		assertEquals(appId, coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any"));
+		coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any");
 
 		verify(checkpointIdCounter).setCount(eq(12312312L + 1));
 
@@ -401,7 +357,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
 				mock(ExecutionVertex.class),
@@ -411,14 +366,13 @@ public class SavepointCoordinatorTest {
 				mockExecutionVertex(jobId) };
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				triggerVertices,
 				ackVertices,
 				new ExecutionVertex[] {},
 				new MockCheckpointIdCounter(),
-				new HeapStateStore<Savepoint>());
+				new HeapStateStore<CompletedCheckpoint>());
 
 		// Trigger savepoint
 		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
@@ -441,7 +395,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
 				mockExecutionVertex(jobId),
@@ -451,14 +404,13 @@ public class SavepointCoordinatorTest {
 				mockExecutionVertex(jobId) };
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				triggerVertices,
 				ackVertices,
 				new ExecutionVertex[] {},
 				new MockCheckpointIdCounter(),
-				new HeapStateStore<Savepoint>());
+				new HeapStateStore<CompletedCheckpoint>());
 
 		// Trigger savepoint
 		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
@@ -481,7 +433,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionVertex[] triggerVertices = new ExecutionVertex[] {
 				mockExecutionVertex(jobId),
@@ -491,14 +442,13 @@ public class SavepointCoordinatorTest {
 				mock(ExecutionVertex.class) };
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				triggerVertices,
 				ackVertices,
 				new ExecutionVertex[] {},
 				new MockCheckpointIdCounter(),
-				new HeapStateStore<Savepoint>());
+				new HeapStateStore<CompletedCheckpoint>());
 
 		// Trigger savepoint
 		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123);
@@ -521,7 +471,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortOnCheckpointTimeout() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionVertex[] vertices = new ExecutionVertex[] {
 				mockExecutionVertex(jobId),
@@ -530,14 +479,13 @@ public class SavepointCoordinatorTest {
 		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				20,
 				vertices,
 				vertices,
 				new ExecutionVertex[] { commitVertex },
 				checkpointIdCounter,
-				new HeapStateStore<Savepoint>());
+				new HeapStateStore<CompletedCheckpoint>());
 
 		// Trigger the savepoint
 		Future<String> savepointPathFuture = coordinator.triggerSavepoint(12731273);
@@ -590,21 +538,19 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortSavepointsOnShutdown() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionVertex[] vertices = new ExecutionVertex[] {
 				mockExecutionVertex(jobId),
 				mockExecutionVertex(jobId) };
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				vertices,
 				vertices,
 				vertices,
 				new MockCheckpointIdCounter(),
-				new HeapStateStore<Savepoint>());
+				new HeapStateStore<CompletedCheckpoint>());
 
 		// Trigger savepoints
 		List<Future<String>> savepointPathFutures = new ArrayList<>();
@@ -635,14 +581,12 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortSavepointOnStateStoreFailure() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4);
-		HeapStateStore<Savepoint> savepointStore = spy(
-				new HeapStateStore<Savepoint>());
+		HeapStateStore<CompletedCheckpoint> savepointStore = spy(
+				new HeapStateStore<CompletedCheckpoint>());
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				60 * 1000,
 				jobVertex.getTaskVertices(),
@@ -653,7 +597,7 @@ public class SavepointCoordinatorTest {
 
 		// Failure on putState
 		doThrow(new Exception("TestException"))
-				.when(savepointStore).putState(any(Savepoint.class));
+				.when(savepointStore).putState(any(CompletedCheckpoint.class));
 
 		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
 
@@ -679,7 +623,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testAbortSavepointIfSubsumed() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		long checkpointTimeout = 60 * 1000;
 		long[] timestamps = new long[] { 1272635, 1272635 + 10 };
@@ -688,10 +631,9 @@ public class SavepointCoordinatorTest {
 				mockExecutionVertex(jobId),
 				mockExecutionVertex(jobId) };
 		MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
-		HeapStateStore<Savepoint> savepointStore = new HeapStateStore<>();
+		HeapStateStore<CompletedCheckpoint> savepointStore = new HeapStateStore<>();
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
-				appId,
 				jobId,
 				checkpointTimeout,
 				vertices,
@@ -751,7 +693,7 @@ public class SavepointCoordinatorTest {
 			verifyNotifyCheckpointComplete(vertex, checkpointIds[1], timestamps[1]);
 		}
 
-		Savepoint[] savepoints = new Savepoint[2];
+		CompletedCheckpoint[] savepoints = new CompletedCheckpoint[2];
 		String[] savepointPaths = new String[2];
 
 		// Verify that the futures have both been completed
@@ -768,7 +710,7 @@ public class SavepointCoordinatorTest {
 		assertTrue(savepointPathFutures.get(1).isCompleted());
 		savepointPaths[1] = Await.result(savepointPathFutures.get(1), FiniteDuration.Zero());
 		savepoints[1] = savepointStore.getState(savepointPaths[1]);
-		verifySavepoint(savepoints[1], appId, jobId, checkpointIds[1], timestamps[1],
+		verifySavepoint(savepoints[1], jobId, checkpointIds[1], timestamps[1],
 				vertices);
 
 		// Verify all promises removed
@@ -779,7 +721,6 @@ public class SavepointCoordinatorTest {
 
 	@Test
 	public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception {
-		ApplicationID appId = new ApplicationID();
 		JobID jobId = new JobID();
 		long checkpointTimeout = 60 * 1000;
 		long timestamp = 1272635;
@@ -792,11 +733,10 @@ public class SavepointCoordinatorTest {
 		final File tmpDir = CommonTestUtils.createTempDirectory();
 
 		try {
-			FileSystemStateStore<Savepoint> savepointStore = new FileSystemStateStore<>(
+			FileSystemStateStore<CompletedCheckpoint> savepointStore = new FileSystemStateStore<>(
 					tmpDir.toURI().toString(), "sp-");
 
 			SavepointCoordinator coordinator = createSavepointCoordinator(
-					appId,
 					jobId,
 					checkpointTimeout,
 					vertices,
@@ -849,8 +789,8 @@ public class SavepointCoordinatorTest {
 			coordinator.shutdown();
 
 			// Verify the savepoint is still available
-			Savepoint savepoint = savepointStore.getState(savepointPath);
-			verifySavepoint(savepoint, appId, jobId, checkpointId, timestamp,
+			CompletedCheckpoint savepoint = savepointStore.getState(savepointPath);
+			verifySavepoint(savepoint, jobId, checkpointId, timestamp,
 					vertices);
 		}
 		finally {
@@ -863,19 +803,17 @@ public class SavepointCoordinatorTest {
 	// ------------------------------------------------------------------------
 
 	private static SavepointCoordinator createSavepointCoordinator(
-			ApplicationID appId,
 			JobID jobId,
 			long checkpointTimeout,
 			ExecutionVertex[] triggerVertices,
 			ExecutionVertex[] ackVertices,
 			ExecutionVertex[] commitVertices,
 			CheckpointIDCounter checkpointIdCounter,
-			StateStore<Savepoint> savepointStore) throws Exception {
+			StateStore<CompletedCheckpoint> savepointStore) throws Exception {
 
 		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 
 		return new SavepointCoordinator(
-				appId,
 				jobId,
 				checkpointTimeout,
 				checkpointTimeout,
@@ -978,17 +916,14 @@ public class SavepointCoordinatorTest {
 	}
 
 	private static void verifySavepoint(
-			Savepoint savepoint,
-			ApplicationID expectedAppId,
+			CompletedCheckpoint savepoint,
 			JobID expectedJobId,
 			long expectedCheckpointId,
 			long expectedTimestamp,
 			ExecutionVertex[] expectedVertices) throws Exception {
 
-		assertEquals(expectedAppId, savepoint.getApplicationId());
-
 		verifyCompletedCheckpoint(
-				savepoint.getCompletedCheckpoint(),
+				savepoint,
 				expectedJobId,
 				expectedCheckpointId,
 				expectedTimestamp,

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
index 3d3238d..69b6f81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointStoreFactoryTest.java
@@ -53,7 +53,7 @@ public class SavepointStoreFactoryTest {
 		SavepointStore store = SavepointStoreFactory.createFromConfig(config);
 		assertTrue(store.getStateStore() instanceof FileSystemStateStore);
 
-		FileSystemStateStore<Savepoint> stateStore = (FileSystemStateStore<Savepoint>)
+		FileSystemStateStore<CompletedCheckpoint> stateStore = (FileSystemStateStore<CompletedCheckpoint>)
 				store.getStateStore();
 		assertEquals(new Path(rootPath), stateStore.getRootPath());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 5eeb150..92b642a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -26,7 +26,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -41,7 +40,6 @@ public class TaskDeploymentDescriptorTest {
 	@Test
 	public void testSerialization() {
 		try {
-			final ApplicationID appId = new ApplicationID();
 			final JobID jobID = new JobID();
 			final JobVertexID vertexID = new JobVertexID();
 			final ExecutionAttemptID execId = new ExecutionAttemptID();
@@ -57,7 +55,7 @@ public class TaskDeploymentDescriptorTest {
 			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
 			final List<URL> requiredClasspaths = new ArrayList<URL>(0);
 	
-			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(appId, jobID, vertexID, execId, taskName,
+			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
 				indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration,
 				invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47);
 	
@@ -69,7 +67,6 @@ public class TaskDeploymentDescriptorTest {
 			assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
 			assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
 
-			assertEquals(orig.getApplicationID(), copy.getApplicationID());
 			assertEquals(orig.getJobID(), copy.getJobID());
 			assertEquals(orig.getVertexID(), copy.getVertexID());
 			assertEquals(orig.getTaskName(), copy.getTaskName());

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 85e8603..ff6593f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.testutils;
 import java.util.Map;
 import java.util.concurrent.Future;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -42,7 +41,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 public class DummyEnvironment implements Environment {
 
 	private final TaskInfo taskInfo;
-	private final ApplicationID appId = new ApplicationID();
 	private final JobID jobId = new JobID();
 	private final JobVertexID jobVertexId = new JobVertexID();
 
@@ -51,11 +49,6 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
-	public ApplicationID getApplicationID() {
-		return appId;
-	}
-
-	@Override
 	public JobID getJobID() {
 		return jobId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0ff03c3..fa97210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -80,8 +79,6 @@ public class MockEnvironment implements Environment {
 
 	private final List<ResultPartitionWriter> outputs;
 
-	private final ApplicationID appId= new ApplicationID();
-
 	private final JobID jobID = new JobID();
 
 	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
@@ -189,11 +186,6 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public ApplicationID getApplicationID() {
-		return this.appId;
-	}
-
-	@Override
 	public JobID getJobID() {
 		return this.jobID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 54723ff..e5ff7b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -148,7 +147,7 @@ public class TaskAsyncCallTest {
 		when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				new ApplicationID(), new JobID(), new JobVertexID(), new ExecutionAttemptID(),
+				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
 				"Test Task", 0, 1, 0,
 				new Configuration(), new Configuration(),
 				CheckpointsInOrderInvokable.class.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 28284eb..a014d3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -57,8 +56,8 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTr
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
@@ -108,8 +107,6 @@ public class TaskManagerTest extends TestLogger {
 
 	final static UUID leaderSessionID = null;
 
-	final static ApplicationID appId = new ApplicationID();
-
 	@BeforeClass
 	public static void setup() {
 		system = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -160,7 +157,7 @@ public class TaskManagerTest extends TestLogger {
 				final JobVertexID vid = new JobVertexID();
 				final ExecutionAttemptID eid = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(appId, jid, vid, eid, "TestTask", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, 0,
 						new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -260,13 +257,13 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(appId, jid1, vid1, eid1, "TestTask1", 1, 5, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(appId, jid2, vid2, eid2, "TestTask2", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -392,13 +389,13 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(appId, jid1, vid1, eid1, "TestTask1", 1, 5, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0,
 						new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(appId, jid2, vid2, eid2, "TestTask2", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -518,13 +515,13 @@ public class TaskManagerTest extends TestLogger {
 				final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(appId, jid, vid1, eid1, "Sender", 0, 1, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(appId, jid, vid2, eid2, "Receiver", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -617,12 +614,12 @@ public class TaskManagerTest extends TestLogger {
 								}
 						);
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(appId, jid, vid1, eid1, "Sender", 0, 1, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
 						Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(appId, jid, vid2, eid2, "Receiver", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
@@ -756,12 +753,12 @@ public class TaskManagerTest extends TestLogger {
 								}
 						);
 
-				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(appId, jid, vid1, eid1, "Sender", 0, 1, 0,
+				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
 						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
 
-				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(appId, jid, vid2, eid2, "Receiver", 2, 7, 0,
+				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0,
 						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
@@ -899,7 +896,7 @@ public class TaskManagerTest extends TestLogger {
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						appId, jid, vid, eid, "Receiver", 0, 1, 0,
+						jid, vid, eid, "Receiver", 0, 1, 0,
 						new Configuration(), new Configuration(),
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -992,7 +989,7 @@ public class TaskManagerTest extends TestLogger {
 						new InputGateDeploymentDescriptor(resultId, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						appId, jid, vid, eid, "Receiver", 0, 1, 0,
+						jid, vid, eid, "Receiver", 0, 1, 0,
 						new Configuration(), new Configuration(),
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1076,7 +1073,6 @@ public class TaskManagerTest extends TestLogger {
 
 				// Single blocking task
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-						new ApplicationID(),
 						new JobID(),
 						new JobVertexID(),
 						new ExecutionAttemptID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 8ad823c..a60e074 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.flink.runtime.taskmanager;
 
-import java.lang.reflect.Field;
-
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
@@ -40,8 +37,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-
 import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.reflect.Field;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -57,7 +56,6 @@ public class TaskStopTest {
 
 		TaskDeploymentDescriptor tddMock = mock(TaskDeploymentDescriptor.class);
 		when(tddMock.getTaskInfo()).thenReturn(taskInfoMock);
-		when(tddMock.getApplicationID()).thenReturn(new ApplicationID());
 		when(tddMock.getJobID()).thenReturn(mock(JobID.class));
 		when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
 		when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/50a166df/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index f7c0ae0..45ca364 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
 
 import com.google.common.collect.Maps;
 
-import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -627,7 +626,7 @@ public class TaskTest {
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
 		return new TaskDeploymentDescriptor(
-				new ApplicationID(), new JobID(), new JobVertexID(), new ExecutionAttemptID(),
+				new JobID(), new JobVertexID(), new ExecutionAttemptID(),
 				"Test Task", 0, 1, 0,
 				new Configuration(), new Configuration(),
 				invokable.getName(),


Mime
View raw message