flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [10/14] flink git commit: [FLINK-7068][blob] Introduce permanent and transient BLOB keys
Date Thu, 05 Oct 2017 14:07:03 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 8f1aa06..35575b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -38,10 +37,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -61,7 +64,6 @@ public class BlobServerRecoveryTest extends TestLogger {
 	public void testBlobServerRecovery() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath());
 
@@ -82,7 +84,7 @@ public class BlobServerRecoveryTest extends TestLogger {
 	 * Helper to test that the {@link BlobServer} recovery from its HA store works.
 	 *
 	 * <p>Uploads two BLOBs to one {@link BlobServer} and expects a second one to be able to retrieve
-	 * them via a shared HA store upon request of a {@link BlobCache}.
+	 * them via a shared HA store upon request of a {@link BlobCacheService}.
 	 *
 	 * @param config
 	 * 		blob server configuration (including HA settings like {@link HighAvailabilityOptions#HA_STORAGE_PATH}
@@ -102,7 +104,7 @@ public class BlobServerRecoveryTest extends TestLogger {
 			BlobServer server0 = new BlobServer(config, blobStore);
 			BlobServer server1 = new BlobServer(config, blobStore);
 			// use VoidBlobStore as the HA store to force download from server[1]'s HA store
-			BlobCache cache1 = new BlobCache(
+			BlobCacheService cache1 = new BlobCacheService(
 				new InetSocketAddress("localhost", server1.getPort()), config,
 				new VoidBlobStore())) {
 
@@ -119,12 +121,13 @@ public class BlobServerRecoveryTest extends TestLogger {
 
 			// Put job-related HA data
 			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
-			keys[0] = put(server0, jobId[0], expected, true); // Request 1
-			keys[1] = put(server0, jobId[1], expected2, true); // Request 2
+			keys[0] = put(server0, jobId[0], expected, PERMANENT_BLOB); // Request 1
+			keys[1] = put(server0, jobId[1], expected2, PERMANENT_BLOB); // Request 2
 
 			// put non-HA data
-			nonHAKey = put(server0, jobId[0], expected2, false);
-			assertEquals(keys[1], nonHAKey);
+			nonHAKey = put(server0, jobId[0], expected2, TRANSIENT_BLOB);
+			assertNotEquals(keys[1], nonHAKey);
+			assertThat(keys[1].getHash(), equalTo(nonHAKey.getHash()));
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");
@@ -132,11 +135,11 @@ public class BlobServerRecoveryTest extends TestLogger {
 			assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
 
 			// Verify HA requests from cache1 (connected to server1) with no immediate access to the file
-			verifyContents(cache1, jobId[0], keys[0], expected, true);
-			verifyContents(cache1, jobId[1], keys[1], expected2, true);
+			verifyContents(cache1, jobId[0], keys[0], expected);
+			verifyContents(cache1, jobId[1], keys[1], expected2);
 
 			// Verify non-HA file is not accessible from server1
-			verifyDeleted(cache1, jobId[0], nonHAKey, true);
+			verifyDeleted(cache1, jobId[0], nonHAKey);
 
 			// Remove again
 			server1.cleanupJob(jobId[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index bcb09d5..6bc06e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -73,12 +73,18 @@ public class BlobUtilsTest extends TestLogger {
 	@Test(expected = IOException.class)
 	public void testExceptionOnCreateCacheDirectoryFailureNoJob() throws IOException {
 		// Should throw an Exception
-		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), null, new BlobKey());
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), null, new TransientBlobKey());
 	}
 
 	@Test(expected = IOException.class)
-	public void testExceptionOnCreateCacheDirectoryFailureForJob() throws IOException {
+	public void testExceptionOnCreateCacheDirectoryFailureForJobTransient() throws IOException {
 		// Should throw an Exception
-		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), new JobID(), new BlobKey());
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), new JobID(), new TransientBlobKey());
+	}
+
+	@Test(expected = IOException.class)
+	public void testExceptionOnCreateCacheDirectoryFailureForJobPermanent() throws IOException {
+		// Should throw an Exception
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), new JobID(), new PermanentBlobKey());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index e083740..744fd60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -36,7 +35,6 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.Collections;
 
 import static org.mockito.Matchers.eq;
@@ -92,8 +90,8 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			Time.days(1L),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList(),
+			Collections.emptyList(),
+			Collections.emptyList(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ClassLoader.getSystemClassLoader());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 9ed4851..92613ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -65,8 +65,8 @@ public class TaskDeploymentDescriptorTest {
 			final Class<? extends AbstractInvokable> invokableClass = BatchTask.class;
 			final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
 			final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0);
-			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
-			final List<URL> requiredClasspaths = new ArrayList<URL>(0);
+			final List<PermanentBlobKey> requiredJars = new ArrayList<>(0);
+			final List<URL> requiredClasspaths = new ArrayList<>(0);
 			final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
 			final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation(
 				jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths));

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 7715eb2..cb5d608 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.OperatingSystem;
@@ -68,8 +68,8 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 
 		JobID jobId1 = new JobID();
 		JobID jobId2 = new JobID();
-		List<BlobKey> keys1 = new ArrayList<>();
-		List<BlobKey> keys2 = new ArrayList<>();
+		List<PermanentBlobKey> keys1 = new ArrayList<>();
+		List<PermanentBlobKey> keys2 = new ArrayList<>();
 		BlobServer server = null;
 		PermanentBlobCache cache = null;
 		BlobLibraryCacheManager libCache = null;
@@ -87,10 +87,10 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
 
-			keys1.add(server.putHA(jobId1, buf));
+			keys1.add(server.putPermanent(jobId1, buf));
 			buf[0] += 1;
-			keys1.add(server.putHA(jobId1, buf));
-			keys2.add(server.putHA(jobId2, buf));
+			keys1.add(server.putPermanent(jobId1, buf));
+			keys2.add(server.putPermanent(jobId2, buf));
 
 			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
 			cache.registerJob(jobId1);
@@ -200,7 +200,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 		JobID jobId = new JobID();
 		ExecutionAttemptID attempt1 = new ExecutionAttemptID();
 		ExecutionAttemptID attempt2 = new ExecutionAttemptID();
-		List<BlobKey> keys = new ArrayList<>();
+		List<PermanentBlobKey> keys = new ArrayList<>();
 		BlobServer server = null;
 		PermanentBlobCache cache = null;
 		BlobLibraryCacheManager libCache = null;
@@ -218,9 +218,9 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
 
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 			buf[0] += 1;
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 
 			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
 			cache.registerJob(jobId);
@@ -245,8 +245,8 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 
 			try {
 				libCache.registerTask(
-					jobId, new ExecutionAttemptID(), Collections.<BlobKey>emptyList(),
-					Collections.<URL>emptyList());
+					jobId, new ExecutionAttemptID(), Collections.emptyList(),
+					Collections.emptyList());
 				fail("Should fail with an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -312,7 +312,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 
 		JobID jobId = new JobID();
 		ExecutionAttemptID attempt1 = new ExecutionAttemptID();
-		List<BlobKey> keys = new ArrayList<>();
+		List<PermanentBlobKey> keys = new ArrayList<>();
 		BlobServer server = null;
 		PermanentBlobCache cache = null;
 		BlobLibraryCacheManager libCache = null;
@@ -330,9 +330,9 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
 
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 			buf[0] += 1;
-			keys.add(server.putHA(jobId, buf));
+			keys.add(server.putPermanent(jobId, buf));
 
 			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
 			cache.registerJob(jobId);
@@ -357,8 +357,8 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 
 			try {
 				libCache.registerTask(
-					jobId, new ExecutionAttemptID(), Collections.<BlobKey>emptyList(),
-					Collections.<URL>emptyList());
+					jobId, new ExecutionAttemptID(), Collections.emptyList(),
+					Collections.emptyList());
 				fail("Should fail with an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -437,8 +437,8 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			cache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
 
 			// upload some meaningless data to the server
-			BlobKey dataKey1 = server.putHA(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
-			BlobKey dataKey2 = server.putHA(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
+			PermanentBlobKey dataKey1 = server.putPermanent(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
+			PermanentBlobKey dataKey2 = server.putPermanent(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
 
 			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
 			assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -457,7 +457,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 
 			// register some BLOBs as libraries
 			{
-				Collection<BlobKey> keys = Collections.singleton(dataKey1);
+				Collection<PermanentBlobKey> keys = Collections.singleton(dataKey1);
 
 				cache.registerJob(jobId);
 				ExecutionAttemptID executionId = new ExecutionAttemptID();
@@ -514,7 +514,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			}
 
 			// see BlobUtils for the directory layout
-			cacheDir = cache.getStorageLocation(jobId, new BlobKey()).getParentFile();
+			cacheDir = cache.getStorageLocation(jobId, new PermanentBlobKey()).getParentFile();
 			assertTrue(cacheDir.exists());
 
 			// make sure no further blobs can be downloaded by removing the write

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index d7fbb7d..2eac17a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -21,13 +21,12 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
@@ -43,7 +42,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -72,7 +70,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -93,14 +90,14 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			byte[] expected = new byte[1024];
 			rand.nextBytes(expected);
 
-			List<BlobKey> keys = new ArrayList<>(2);
+			ArrayList<PermanentBlobKey> keys = new ArrayList<>(2);
 
 			JobID jobId = new JobID();
 
 			// Upload some data (libraries)
-			keys.add(server[0].putHA(jobId, expected)); // Request 1
+			keys.add(server[0].putPermanent(jobId, expected)); // Request 1
 			byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
-			keys.add(server[0].putHA(jobId, expected2)); // Request 2
+			keys.add(server[0].putPermanent(jobId, expected2)); // Request 2
 
 			// The cache
 			cache = new PermanentBlobCache(serverAddress[0], config, blobStoreService);
@@ -110,7 +107,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
 
 			// Verify key 1
-			File f = cache.getHAFile(jobId, keys.get(0));
+			File f = cache.getFile(jobId, keys.get(0));
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -127,7 +124,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			cache = new PermanentBlobCache(serverAddress[1], config, blobStoreService);
 
 			// Verify key 1
-			f = cache.getHAFile(jobId, keys.get(0));
+			f = cache.getFile(jobId, keys.get(0));
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -139,7 +136,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// Verify key 2
-			f = cache.getHAFile(jobId, keys.get(1));
+			f = cache.getFile(jobId, keys.get(1));
 			assertEquals(expected2.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index beeefb1b..6b872ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -50,7 +49,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -140,8 +138,8 @@ public class FailoverRegionTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				new InfiniteDelayRestartStrategy(10),
 				new FailoverPipelinedRegionWithDirectExecutor(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				slotProvider,
 				ExecutionGraph.class.getClassLoader());
 
@@ -267,8 +265,8 @@ public class FailoverRegionTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				new InfiniteDelayRestartStrategy(10),
 				new RestartPipelinedRegionStrategy.Factory(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				scheduler,
 				ExecutionGraph.class.getClassLoader());
 		try {
@@ -344,8 +342,8 @@ public class FailoverRegionTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				new InfiniteDelayRestartStrategy(10),
 				new FailoverPipelinedRegionWithDirectExecutor(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				scheduler,
 				ExecutionGraph.class.getClassLoader());
 		try {
@@ -457,8 +455,8 @@ public class FailoverRegionTest extends TestLogger {
 				AkkaUtils.getDefaultTimeout(),
 				restartStrategy,
 				new FailoverPipelinedRegionWithDirectExecutor(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				scheduler,
 				ExecutionGraph.class.getClassLoader());
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index 4dd55ae..f441970 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -37,7 +36,6 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.Collections;
 import java.util.Random;
 
@@ -174,8 +172,8 @@ public class GlobalModVersionTest {
 				Time.seconds(10),
 				new InfiniteDelayRestartStrategy(),
 				new CustomStrategy(failoverStrategy),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				slotProvider,
 				getClass().getClassLoader());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 713aece..c52b59c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy.Factory;
@@ -42,7 +41,6 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 
@@ -298,8 +296,8 @@ public class IndividualRestartsConcurrencyTest {
 				Time.seconds(10),
 				restartStrategy,
 				failoverStrategy,
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				slotProvider,
 				getClass().getClassLoader());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 635ec75..3c58616 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -42,7 +41,6 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 
@@ -320,8 +318,8 @@ public class PipelinedRegionFailoverConcurrencyTest {
 				Time.seconds(10),
 				restartStrategy,
 				failoverStrategy,
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				slotProvider,
 				getClass().getClassLoader());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index 45aabe6..7c4151b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -36,7 +35,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -105,8 +103,8 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.<BlobKey>emptyList(),
-            Collections.<URL>emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader());
 		try {
@@ -190,8 +188,8 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.<BlobKey>emptyList(),
-            Collections.<URL>emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader());
 		try {
@@ -280,8 +278,8 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.<BlobKey>emptyList(),
-            Collections.<URL>emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader());
 		try {
@@ -361,8 +359,8 @@ public class RestartPipelinedRegionStrategyTest {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
             new RestartPipelinedRegionStrategy.Factory(),
-            Collections.<BlobKey>emptyList(),
-            Collections.<URL>emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
             scheduler,
             ExecutionGraph.class.getClassLoader());
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index a38f674..501bedd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -123,12 +123,12 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 			CheckpointOptions checkpointOptions) {}
 
 	@Override
-	public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
+	public CompletableFuture<TransientBlobKey> requestTaskManagerLog(Time timeout) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
+	public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) {
 		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
index cc4812b..5f556ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -188,7 +188,7 @@ public class JobManagerCleanupITCase extends TestLogger {
 
 						// upload a blob
 						tempBlob = File.createTempFile("Required", ".jar");
-						List<BlobKey> keys =
+						List<PermanentBlobKey> keys =
 							BlobClient.uploadJarFiles(new InetSocketAddress("localhost", blobPort),
 								config, jid,
 								Collections.singletonList(new Path(tempBlob.getAbsolutePath())));
@@ -197,7 +197,7 @@ public class JobManagerCleanupITCase extends TestLogger {
 
 						if (testCase == TestCase.JOB_SUBMISSION_FAILS) {
 							// add an invalid key so that the submission fails
-							jobGraph.addBlob(new BlobKey());
+							jobGraph.addBlob(new PermanentBlobKey());
 						}
 
 						// Submit the job and wait for all vertices to be running
@@ -275,7 +275,7 @@ public class JobManagerCleanupITCase extends TestLogger {
 	 *
 	 * @param blobDir
 	 * 		directory of a {@link org.apache.flink.runtime.blob.BlobServer} or {@link
-	 * 		org.apache.flink.runtime.blob.BlobCache}
+	 * 		org.apache.flink.runtime.blob.BlobCacheService}
 	 * @param remaining
 	 * 		remaining time for this test
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 631dd5f..340a735 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -132,7 +132,7 @@ public class JobSubmitTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 
 			// add a reference to some non-existing BLOB to the job graph as a dependency
-			jg.addBlob(new BlobKey());
+			jg.addBlob(new PermanentBlobKey());
 
 			// submit the job
 			Future<Object> submitFuture = jmGateway.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
index 543b2e6..8e76674 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.instance.Instance;
@@ -96,7 +96,7 @@ public class TaskManagerLogHandlerTest {
 		when(taskManager.getId()).thenReturn(tmID);
 		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
 		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-		CompletableFuture<BlobKey> future = new CompletableFuture<>();
+		CompletableFuture<TransientBlobKey> future = new CompletableFuture<>();
 		future.completeExceptionally(new IOException("failure"));
 		when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 8c27ee5..0413496 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -99,7 +98,6 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 
 import java.net.InetAddress;
-import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -667,8 +665,8 @@ public class TaskExecutorTest extends TestLogger {
 				name.getMethodName(),
 				new SerializedValue<>(new ExecutionConfig()),
 				new Configuration(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList());
+				Collections.emptyList(),
+				Collections.emptyList());
 
 		TaskInformation taskInformation = new TaskInformation(
 				jobVertexId,
@@ -699,12 +697,8 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 		when(jobMasterGateway.getFencingToken()).thenReturn(jobMasterId);
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
 			jobId,
@@ -712,7 +706,7 @@ public class TaskExecutorTest extends TestLogger {
 			jobMasterGateway,
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			libraryCacheManager,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));
@@ -1212,12 +1206,8 @@ public class TaskExecutorTest extends TestLogger {
 		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
 		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
 			jobId,
@@ -1225,7 +1215,7 @@ public class TaskExecutorTest extends TestLogger {
 			jobMasterGateway,
 			mock(TaskManagerActions.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			libraryCacheManager,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));
@@ -1274,8 +1264,8 @@ public class TaskExecutorTest extends TestLogger {
 				name.getMethodName(),
 				new SerializedValue<>(new ExecutionConfig()),
 				new Configuration(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList());
+				Collections.emptyList(),
+				Collections.emptyList());
 
 			TaskInformation taskInformation = new TaskInformation(
 				jobVertexId,

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index b61d1b6..82dd99e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -59,7 +58,6 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 
@@ -148,12 +146,8 @@ public class TaskAsyncCallTest {
 	}
 	
 	private static Task createTask() throws Exception {
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
@@ -176,8 +170,8 @@ public class TaskAsyncCallTest {
 			"Job Name",
 			new SerializedValue<>(new ExecutionConfig()),
 			new Configuration(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 
 		TaskInformation taskInformation = new TaskInformation(
 			new JobVertexID(),
@@ -205,7 +199,7 @@ public class TaskAsyncCallTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index fdae251..4577185 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -210,7 +210,7 @@ public class TaskManagerTest extends TestLogger {
 					TestInvokableCorrect.class.getName(),
 					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 					Collections.<InputGateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+					new ArrayList<PermanentBlobKey>(), Collections.emptyList(), 0);
 
 
 				new Within(d) {
@@ -313,7 +313,7 @@ public class TaskManagerTest extends TestLogger {
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.<URL>emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid2, "TestJob2", vid2, eid2,
@@ -322,7 +322,7 @@ public class TaskManagerTest extends TestLogger {
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				final ActorGateway tm = taskManager;
 
@@ -453,13 +453,13 @@ public class TaskManagerTest extends TestLogger {
 						"TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
 						"TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				final ActorGateway tm = taskManager;
 
@@ -585,7 +585,7 @@ public class TaskManagerTest extends TestLogger {
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid2, eid2,
@@ -594,7 +594,7 @@ public class TaskManagerTest extends TestLogger {
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				new Within(d){
 
@@ -692,8 +692,8 @@ public class TaskManagerTest extends TestLogger {
 						new SerializedValue<>(new ExecutionConfig()),
 						"Sender", 1, 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
-						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(),
-						Collections.<URL>emptyList(), 0);
+						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<>(),
+						Collections.emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid2, eid2,
@@ -702,7 +702,7 @@ public class TaskManagerTest extends TestLogger {
 						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				new Within(d) {
 
@@ -842,7 +842,7 @@ public class TaskManagerTest extends TestLogger {
 						"Sender", 1, 0, 1, 0,
 						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
 						irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid2, eid2,
@@ -851,7 +851,7 @@ public class TaskManagerTest extends TestLogger {
 						new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(ircdd),
-						new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+						new ArrayList<>(), Collections.emptyList(), 0);
 
 				new Within(d){
 
@@ -998,8 +998,8 @@ public class TaskManagerTest extends TestLogger {
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(igdd),
-						Collections.<BlobKey>emptyList(),
-						Collections.<URL>emptyList(), 0);
+						Collections.emptyList(),
+						Collections.emptyList(), 0);
 
 				new Within(d) {
 					@Override
@@ -1116,8 +1116,8 @@ public class TaskManagerTest extends TestLogger {
 						Tasks.AgnosticReceiver.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.singletonList(igdd),
-						Collections.<BlobKey>emptyList(),
-						Collections.<URL>emptyList(), 0);
+						Collections.emptyList(),
+						Collections.emptyList(), 0);
 
 				new Within(new FiniteDuration(120, TimeUnit.SECONDS)) {
 					@Override
@@ -1266,8 +1266,8 @@ public class TaskManagerTest extends TestLogger {
 						BlockingNoOpInvokable.class.getName(),
 						Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 						Collections.<InputGateDeploymentDescriptor>emptyList(),
-						Collections.<BlobKey>emptyList(),
-						Collections.<URL>emptyList(),
+						Collections.emptyList(),
+						Collections.emptyList(),
 						0);
 
 				// Submit the task
@@ -1587,7 +1587,7 @@ public class TaskManagerTest extends TestLogger {
 				TestInvokableRecordCancel.class.getName(),
 				Collections.singletonList(resultPartitionDeploymentDescriptor),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+				new ArrayList<>(), Collections.emptyList(), 0);
 
 
 			ActorRef jmActorRef = system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, leaderSessionID), "jobmanager");
@@ -1662,8 +1662,8 @@ public class TaskManagerTest extends TestLogger {
 				"Foobar",
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				0);
 
 			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
@@ -1723,8 +1723,8 @@ public class TaskManagerTest extends TestLogger {
 				BlockingNoOpInvokable.class.getName(),
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				0);
 
 			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
@@ -1836,8 +1836,8 @@ public class TaskManagerTest extends TestLogger {
 				BlockingNoOpInvokable.class.getName(),
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
+				Collections.emptyList(),
+				Collections.emptyList(),
 				0);
 
 			Future<Object> submitResponse = taskManager.ask(new SubmitTask(tdd), timeout);
@@ -2125,7 +2125,7 @@ public class TaskManagerTest extends TestLogger {
 		String invokableClassName,
 		Collection<ResultPartitionDeploymentDescriptor> producedPartitions,
 		Collection<InputGateDeploymentDescriptor> inputGates,
-		Collection<BlobKey> requiredJarFiles,
+		Collection<PermanentBlobKey> requiredJarFiles,
 		Collection<URL> requiredClasspaths,
 		int targetSlotNumber) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index ee95862..d062def 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -77,12 +77,8 @@ public class TaskStopTest {
 		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
 		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		task = new Task(
 			mock(JobInformation.class),
@@ -108,7 +104,7 @@ public class TaskStopTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
 			tmRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index bd86abf..b089997 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -70,7 +69,6 @@ import scala.concurrent.duration.FiniteDuration;
 import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.net.URL;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -230,8 +228,8 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testLibraryCacheRegistrationFailed() {
 		try {
-			BlobCache blobCache = createBlobCache();
-			Task task = createTask(TestInvokableCorrect.class, blobCache,
+			BlobCacheService blobService = createBlobCache();
+			Task task = createTask(TestInvokableCorrect.class, blobService,
 				mock(LibraryCacheManager.class));
 
 			// task should be new and perfect
@@ -266,7 +264,7 @@ public class TaskTest extends TestLogger {
 	@Test
 	public void testExecutionFailsInNetworkRegistration() {
 		try {
-			BlobCache blobCache = createBlobCache();
+			BlobCacheService blobService = createBlobCache();
 			// mock a working library cache
 			LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 			when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
@@ -281,7 +279,7 @@ public class TaskTest extends TestLogger {
 			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
-			Task task = createTask(TestInvokableCorrect.class, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
+			Task task = createTask(TestInvokableCorrect.class, blobService, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
 
 			task.registerExecutionListener(listener);
 
@@ -626,7 +624,7 @@ public class TaskTest extends TestLogger {
 		IntermediateDataSetID resultId = new IntermediateDataSetID();
 		ResultPartitionID partitionId = new ResultPartitionID();
 
-		BlobCache blobCache = createBlobCache();
+		BlobCacheService blobService = createBlobCache();
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 
@@ -639,7 +637,7 @@ public class TaskTest extends TestLogger {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 			.thenReturn(mock(TaskKvStateRegistry.class));
 
-		createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+		createTask(InvokableBlockingInInvoke.class, blobService, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 		// Test all branches of trigger partition state check
 
@@ -648,7 +646,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// PartitionProducerDisposedException
-			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobService, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -664,7 +662,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// Any other exception
-			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobService, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 
 			CompletableFuture<ExecutionState> promise = new CompletableFuture<>();
 			when(partitionChecker.requestPartitionProducerState(eq(task.getJobID()), eq(resultId), eq(partitionId))).thenReturn(promise);
@@ -681,7 +679,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// TimeoutException handled special => retry
-			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobService, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 			SingleInputGate inputGate = mock(SingleInputGate.class);
 			when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -712,7 +710,7 @@ public class TaskTest extends TestLogger {
 			createQueuesAndActors();
 
 			// Success
-			Task task = createTask(InvokableBlockingInInvoke.class, blobCache, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
+			Task task = createTask(InvokableBlockingInInvoke.class, blobService, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
 			SingleInputGate inputGate = mock(SingleInputGate.class);
 			when(inputGate.getConsumedResultId()).thenReturn(resultId);
 
@@ -888,19 +886,15 @@ public class TaskTest extends TestLogger {
 	}
 
 	/**
-	 * Creates a {@link BlobCache} mock that is suitable to be used in the tests above.
+	 * Creates a {@link BlobCacheService} mock that is suitable to be used in the tests above.
 	 *
 	 * @return BlobCache mock with the bare minimum of implemented functions that work
 	 */
-	private BlobCache createBlobCache() {
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
+	private BlobCacheService createBlobCache() {
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
-
-		return blobCache;
+		return blobService;
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable) throws IOException {
@@ -908,30 +902,30 @@ public class TaskTest extends TestLogger {
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException {
-		BlobCache blobCache = createBlobCache();
+		BlobCacheService blobService = createBlobCache();
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, blobCache,libCache, config, new ExecutionConfig());
+		return createTask(invokable, blobService,libCache, config, new ExecutionConfig());
 	}
 
 	private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException {
-		BlobCache blobCache = createBlobCache();
+		BlobCacheService blobService = createBlobCache();
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		return createTask(invokable, blobCache,libCache, config, execConfig);
+		return createTask(invokable, blobService,libCache, config, execConfig);
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
-			BlobCache blobCache,
+			BlobCacheService blobService,
 			LibraryCacheManager libCache) throws IOException {
 
-		return createTask(invokable, blobCache,libCache, new Configuration(), new ExecutionConfig());
+		return createTask(invokable, blobService,libCache, new Configuration(), new ExecutionConfig());
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
-			BlobCache blobCache,
+			BlobCacheService blobService,
 			LibraryCacheManager libCache,
 			Configuration config,
 			ExecutionConfig execConfig) throws IOException {
@@ -946,23 +940,23 @@ public class TaskTest extends TestLogger {
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		return createTask(invokable, blobCache, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
+		return createTask(invokable, blobService, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
 	}
 
 	private Task createTask(
 			Class<? extends AbstractInvokable> invokable,
-			BlobCache blobCache,
+			BlobCacheService blobService,
 			LibraryCacheManager libCache,
 			NetworkEnvironment networkEnvironment,
 			ResultPartitionConsumableNotifier consumableNotifier,
 			PartitionProducerStateChecker partitionProducerStateChecker,
 			Executor executor) throws IOException {
-		return createTask(invokable, blobCache, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
+		return createTask(invokable, blobService, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
 	}
 	
 	private Task createTask(
 		Class<? extends AbstractInvokable> invokable,
-		BlobCache blobCache,
+		BlobCacheService blobService,
 		LibraryCacheManager libCache,
 		NetworkEnvironment networkEnvironment,
 		ResultPartitionConsumableNotifier consumableNotifier,
@@ -991,8 +985,8 @@ public class TaskTest extends TestLogger {
 			"Test Job",
 			serializedExecutionConfig,
 			new Configuration(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 
 		TaskInformation taskInformation = new TaskInformation(
 			jobVertexId,
@@ -1023,7 +1017,7 @@ public class TaskTest extends TestLogger {
 			taskManagerConnection,
 			inputSplitProvider,
 			checkpointResponder,
-			blobCache,
+			blobService,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig),

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 80f86f4..57a3831 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -24,8 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -66,7 +65,6 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -148,7 +146,7 @@ public class JvmExitOnFatalErrorTest {
 
 				final JobInformation jobInformation = new JobInformation(
 						jid, "Test Job", execConfig, new Configuration(),
-						Collections.<BlobKey>emptyList(), Collections.<URL>emptyList());
+						Collections.emptyList(), Collections.emptyList());
 
 				final TaskInformation taskInformation = new TaskInformation(
 						jobVertexId, "Test Task", 1, 1, OomInvokable.class.getName(), new Configuration());
@@ -163,12 +161,8 @@ public class JvmExitOnFatalErrorTest {
 
 				final Executor executor = Executors.newCachedThreadPool();
 
-				BlobCache blobCache = mock(BlobCache.class);
-				PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-				TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-				when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-				when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+				BlobCacheService blobService =
+					new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 				Task task = new Task(
 						jobInformation,
@@ -188,7 +182,7 @@ public class JvmExitOnFatalErrorTest {
 						new NoOpTaskManagerActions(),
 						new NoOpInputSplitProvider(),
 						new NoOpCheckpointResponder(),
-						blobCache,
+					blobService,
 						new FallbackLibraryCacheManager(),
 						new FileCache(tmInfo.getTmpDirectories()),
 						tmInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 309f24d..edea3f3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -48,7 +47,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -154,8 +152,8 @@ public class RescalePartitionerTest extends TestLogger {
 			AkkaUtils.getDefaultTimeout(),
 			new NoRestartStrategy(),
 			new RestartAllStrategy.Factory(),
-			new ArrayList<BlobKey>(),
-			new ArrayList<URL>(),
+			new ArrayList<>(),
+			new ArrayList<>(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ExecutionGraph.class.getClassLoader());
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index d1554b3..b43f30a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -75,7 +74,6 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -126,8 +124,8 @@ public class BlockingCheckpointsTest {
 				"test job name",
 				new SerializedValue<>(new ExecutionConfig()),
 				new Configuration(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList());
+				Collections.emptyList(),
+				Collections.emptyList());
 
 		TaskInformation taskInformation = new TaskInformation(
 				new JobVertexID(),
@@ -141,12 +139,8 @@ public class BlockingCheckpointsTest {
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry);
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		return new Task(
 				jobInformation,
@@ -166,7 +160,7 @@ public class BlockingCheckpointsTest {
 				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				mock(CheckpointResponder.class),
-				blobCache,
+			blobService,
 				new FallbackLibraryCacheManager(),
 				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 				new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 1095316..19823ec 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -76,7 +75,6 @@ import org.junit.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -231,8 +229,8 @@ public class InterruptSensitiveRestoreTest {
 			"test job name",
 			new SerializedValue<>(new ExecutionConfig()),
 			new Configuration(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 
 		TaskInformation taskInformation = new TaskInformation(
 			jobVertexID,
@@ -242,12 +240,8 @@ public class InterruptSensitiveRestoreTest {
 			SourceStreamTask.class.getName(),
 			taskConfig);
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		return new Task(
 			jobInformation,
@@ -267,7 +261,7 @@ public class InterruptSensitiveRestoreTest {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			new FallbackLibraryCacheManager(),
 			new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 			new TestingTaskManagerRuntimeInfo(),

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 9e0468b..a989fa4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -74,7 +73,6 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -122,8 +120,8 @@ public class StreamTaskTerminationTest extends TestLogger {
 			"Test Job",
 			new SerializedValue<>(new ExecutionConfig()),
 			new Configuration(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 
 		final TaskInformation taskInformation = new TaskInformation(
 			new JobVertexID(),
@@ -138,12 +136,8 @@ public class StreamTaskTerminationTest extends TestLogger {
 		final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class);
 		when(networkEnv.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class));
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		final Task task = new Task(
 			jobInformation,
@@ -163,7 +157,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			new FallbackLibraryCacheManager(),
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 100c16b..525b351 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,8 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -110,7 +109,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -860,12 +858,8 @@ public class StreamTaskTest extends TestLogger {
 			StreamConfig taskConfig,
 			Configuration taskManagerConfig) throws Exception {
 
-		BlobCache blobCache = mock(BlobCache.class);
-		PermanentBlobCache permanentBlobCache = mock(PermanentBlobCache.class);
-		TransientBlobCache transientBlobCache = mock(TransientBlobCache.class);
-
-		when(blobCache.getPermanentBlobStore()).thenReturn(permanentBlobCache);
-		when(blobCache.getTransientBlobStore()).thenReturn(transientBlobCache);
+		BlobCacheService blobService =
+			new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class));
 
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
@@ -886,8 +880,8 @@ public class StreamTaskTest extends TestLogger {
 			"Job Name",
 			new SerializedValue<>(new ExecutionConfig()),
 			new Configuration(),
-			Collections.<BlobKey>emptyList(),
-			Collections.<URL>emptyList());
+			Collections.emptyList(),
+			Collections.emptyList());
 
 		TaskInformation taskInformation = new TaskInformation(
 			new JobVertexID(),
@@ -915,7 +909,7 @@ public class StreamTaskTest extends TestLogger {
 			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
-			blobCache,
+			blobService,
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),


Mime
View raw message