flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/5] flink git commit: [FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache
Date Fri, 18 Aug 2017 07:36:50 GMT
[FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the BlobCache

Also change from BlobKey-based ref-counting to job-based ref-counting which is
simpler and the mode we want to use from now on. Deferred cleanup (as before)
is currently not implemented yet (TODO).
At the BlobServer, no ref-counting will be used but the cleanup will happen
when the job enters a final state (TODO).

[FLINK-7057][blob] change to a cleaner API for BlobService#registerJob()

[FLINK-7057][blob] implement deferred cleanup at the BlobCache

Whenever a job is not referenced at the BlobCache anymore, we set a TTL and let
the cleanup task remove it when this is hit and the task is run. For now, this
means that a BLOB will be retained at most
(2 * ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) seconds after not
being referenced anymore. We do this so that a recovery still has the chance to
use existing files rather than to download them again.

[FLINK-7057][blob] integrate cleanup of job-related JARs from the BlobServer

TODO: an integration test that verifies that this is actually done when desired
and not performed when not, e.g. if the job did not reach a final execution
state

[FLINK-7057][tests] extract FailingBlockingInvokable from CoordinatorShutdownTest

[FLINK-7057][blob] add an integration test for the BlobServer cleanup

This ensures that BLOB files are actually deleted when a job enters a final
state.

[FLINK-7057][tests] refrain from catching an exception just to fail the test

removes code like this in the BLOB store unit tests:

catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
}

[FLINK-7057][blob] fix BlobServer#cleanupJob() being too eager

Instead of deleting the job's directory, it was deleting the parent storage
directory.

[FLINK-7057][blob] fix BlobServer cleanup integration

* the test did not check the correct directories for cleanup
* the test did not honour the test timeout

[FLINK-7057][blob] test and fix BlobServer cleanup for a failed job submission

[FLINK-7057][blob] rework the LibraryCacheManager API

Since ref-counting has moved to the BlobCache, the BlobLibraryCacheManager is
just a thin wrapper to get a user class loader by retrieving BLOBs from the
BlobCache/BlobServer. Therefore, move the job-registration/-release out of it,
too, and restrict its use to the task manager where the BlobCache is used (on
the BlobServer, jobs do not need registration since they are only used once and
will be deleted when they enter a final state).

This makes the BlobServer and BlobCache instances available at the JobManager
and TaskManager instances, respectively, also enabling future use cases outside
of the LibraryCacheManager.

[FLINK-7057][blob] address PR comments

[FLINK-7057][blob] fix JobManagerLeaderElectionTest

[FLINK-7057][blob] re-introduce some ref-counting for BlobLibraryCacheManager

Apparently, we do need to return the same ClassLoader for different (parallel)
tasks of a job running on the same task manager. Therefore, keep the initial
task registration implementation that was removed with
8331fbb208d975e0c1ec990344c14315ea08dd4a and only adapt it here. This also
restores some tests and adds new combinations not tested before.

[FLINK-7057][blob] address PR comments

[FLINK-7057][tests] fix (manual/ignored) BlobCacheCleanupTest#testJobDeferredCleanup()

[FLINK-7057][hotfix] fix a checkstyle error

[FLINK-7057][blob] remove the extra lock object from BlobCache

We can lock on jobRefCounters instead, which is what we are guarding anyway.

[FLINK-7057][blob] minor improvements to the TTL in BlobCache

Do not use Long.MAX_VALUE as a code for "keep forever". Also add more comments.

[FLINK-7057][blob] replace "library-cache-manager.cleanup.interval" with "blob.service.cleanup.interval"

Since we moved the cleanup to the BLOB service classes, this only makes sense.

[FLINK-7057][hotfix] remove an unused import

[FLINK-7057][docs] adapt javadocs of JobManager descendents

[FLINK-7057][blob] increase JobManagerCleanupITCase timeout

The previous value of 15s seems to be too low for some runs on Travis.

[FLINK-7057][blob] providing more debug output in JobManagerCleanupITCase

In case the BlobServer's directory is not cleaned within the remaining time,
also print which files remain. This may help debugging the situation.

This closes #4238.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b236240
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b236240
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b236240

Branch: refs/heads/master
Commit: 7b23624066c46d58c7b7181e5576a9834af9ac7a
Parents: 9c80d40
Author: Nico Kruber <nico@data-artisans.com>
Authored: Tue Jun 27 18:29:44 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Aug 18 09:29:32 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   7 +
 .../flink/configuration/BlobServerOptions.java  |  16 +-
 .../flink/configuration/ConfigConstants.java    |   9 +-
 .../clusterframework/MesosJobManager.scala      |   8 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 140 ++++-
 .../apache/flink/runtime/blob/BlobClient.java   |  21 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  40 +-
 .../runtime/blob/BlobServerConnection.java      |  18 +-
 .../apache/flink/runtime/blob/BlobService.java  |   5 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |  25 +-
 .../apache/flink/runtime/client/JobClient.java  |   3 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |   3 +-
 .../dispatcher/StandaloneDispatcher.java        |   5 +-
 .../entrypoint/JobClusterEntrypoint.java        |   3 +-
 .../librarycache/BlobLibraryCacheManager.java   | 327 ++++++-----
 .../FallbackLibraryCacheManager.java            |   8 +-
 .../librarycache/LibraryCacheManager.java       |  25 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   7 +-
 .../runtime/jobmaster/JobManagerServices.java   |  20 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  14 +-
 .../taskexecutor/JobManagerConnection.java      |  32 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  11 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  13 +-
 .../ContaineredJobManager.scala                 |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  48 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   5 +
 .../flink/runtime/taskmanager/TaskManager.scala |  34 +-
 .../runtime/blob/BlobCacheCleanupTest.java      | 328 +++++++++++
 .../runtime/blob/BlobCacheRetriesTest.java      |   4 +-
 .../flink/runtime/blob/BlobClientTest.java      |  29 +-
 .../apache/flink/runtime/blob/BlobKeyTest.java  |   6 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  85 ++-
 .../flink/runtime/blob/BlobUtilsTest.java       |   3 +-
 .../checkpoint/CoordinatorShutdownTest.java     |  23 +-
 .../runtime/dispatcher/DispatcherTest.java      |   3 +-
 .../BlobLibraryCacheManagerTest.java            | 540 +++++++++++++------
 .../BlobLibraryCacheRecoveryITCase.java         |  36 +-
 .../jobmanager/JobManagerCleanupITCase.java     | 300 +++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |  20 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   5 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |   9 +-
 .../JobManagerLeaderElectionTest.java           |  11 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   5 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +
 .../flink/runtime/taskmanager/TaskTest.java     |  35 +-
 .../testtasks/FailingBlockingInvokable.java     |  48 ++
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   2 +
 .../jobmanager/JobManagerRegistrationTest.scala |   5 +-
 .../runtime/testingUtils/TestingCluster.scala   |   3 +
 .../testingUtils/TestingJobManager.scala        |   9 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   2 +
 .../tasks/InterruptSensitiveRestoreTest.java    |   2 +
 .../tasks/StreamTaskTerminationTest.java        |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java |   3 +
 .../flink/yarn/TestingYarnJobManager.scala      |   3 +
 .../org/apache/flink/yarn/YarnJobManager.scala  |   6 +-
 59 files changed, 1759 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 4138b4d..e0b9d4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -196,6 +196,13 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.
 
+- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of the blob caches (DEFAULT: 1 hour).
+Whenever a job is not referenced at the cache anymore, we set a TTL and let the periodic cleanup task
+(executed every `blob.service.cleanup.interval` seconds) remove its blob files after this TTL has passed.
+This means that a blob will be retained at most <tt>2 * `blob.service.cleanup.interval`</tt> seconds after
+not being referenced anymore. Therefore, a recovery still has the chance to use existing files rather
+than to download them again.
+
 - `blob.server.port`: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.
 
 - `blob.service.ssl.enabled`: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index e27c29f..019580a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
- * Configuration options for the BlobServer.
+ * Configuration options for the BlobServer and BlobCache.
  */
 @PublicEvolving
 public class BlobServerOptions {
@@ -73,4 +73,18 @@ public class BlobServerOptions {
 	public static final ConfigOption<Boolean> SSL_ENABLED =
 		key("blob.service.ssl.enabled")
 			.defaultValue(true);
+
+	/**
+	 * Cleanup interval of the blob caches at the task managers (in seconds).
+	 *
+	 * <p>Whenever a job is not referenced at the cache anymore, we set a TTL and let the periodic
+	 * cleanup task (executed every CLEANUP_INTERVAL seconds) remove its blob files after this TTL
+	 * has passed. This means that a blob will be retained at most <tt>2 * CLEANUP_INTERVAL</tt>
+	 * seconds after not being referenced anymore. Therefore, a recovery still has the chance to use
+	 * existing files rather than to download them again.
+	 */
+	public static final ConfigOption<Long> CLEANUP_INTERVAL =
+		key("blob.service.cleanup.interval")
+			.defaultValue(3_600L) // once per hour
+			.withDeprecatedKeys("library-cache-manager.cleanup.interval");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4c6c62a..4153e45 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,10 @@ public final class ConfigConstants {
 
 	/**
 	 * The config parameter defining the cleanup interval of the library cache manager.
+	 *
+	 * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
 	 */
+	@Deprecated
 	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
 
 	/**
@@ -1253,8 +1256,12 @@ public final class ConfigConstants {
 
 	/**
 	 * The default library cache manager cleanup interval in seconds
+	 *
+	 * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
 	 */
-	public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 3600;
+	@Deprecated
+	public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL =
+		BlobServerOptions.CLEANUP_INTERVAL.defaultValue();
 	
 	/**
 	 * The default network port to connect to for communication with the job manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 3e7c55f..f854a1e 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -34,7 +35,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 
-/** JobManager actor for execution on Mesos. .
+/** JobManager actor for execution on Mesos.
   *
   * @param flinkConfiguration Configuration object for the actor
   * @param futureExecutor Execution context which is used to execute concurrent tasks in the
@@ -43,7 +44,8 @@ import scala.concurrent.duration._
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param blobServer BLOB store for file uploads
+  * @param libraryCacheManager manages uploaded jar files and class paths
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
   * @param timeout Timeout for futures
@@ -55,6 +57,7 @@ class MesosJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -70,6 +73,7 @@ class MesosJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 29f7706..c50a888 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -25,7 +26,6 @@ import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -33,6 +33,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -47,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * download it from a distributed file system (if available) or the BLOB
  * server.</p>
  */
-public final class BlobCache implements BlobService {
+public class BlobCache extends TimerTask implements BlobService {
 
 	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
@@ -71,6 +76,32 @@ public final class BlobCache implements BlobService {
 	/** Configuration for the blob client like ssl parameters required to connect to the blob server */
 	private final Configuration blobClientConfig;
 
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Job reference counters with a time-to-live (TTL).
+	 */
+	private static class RefCount {
+		/**
+		 * Number of references to a job.
+		 */
+		public int references = 0;
+		
+		/**
+		 * Timestamp in milliseconds when any job data should be cleaned up (no cleanup for
+		 * non-positive values).
+		 */
+		public long keepUntil = -1;
+	}
+
+	/** Map to store the number of references to a specific job */
+	private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
+
+	/** Time interval (ms) to run the cleanup task; also used as the default TTL. */
+	private final long cleanupInterval;
+
+	private final Timer cleanupTimer;
+
 	/**
 	 * Instantiates a new BLOB cache.
 	 *
@@ -108,11 +139,63 @@ public final class BlobCache implements BlobService {
 			this.numFetchRetries = 0;
 		}
 
+		// Initializing the clean up task
+		this.cleanupTimer = new Timer(true);
+
+		cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
+
 		// Add shutdown hook to delete storage directory
 		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 	}
 
 	/**
+	 * Registers use of job-related BLOBs.
+	 * <p>
+	 * Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls
+	 * to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 *
+	 * @see #releaseJob(JobID)
+	 */
+	public void registerJob(JobID jobId) {
+		synchronized (jobRefCounters) {
+			RefCount ref = jobRefCounters.get(jobId);
+			if (ref == null) {
+				ref = new RefCount();
+				jobRefCounters.put(jobId, ref);
+			}
+			++ref.references;
+		}
+	}
+
+	/**
+	 * Unregisters use of job-related BLOBs and allow them to be released.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 *
+	 * @see #registerJob(JobID)
+	 */
+	public void releaseJob(JobID jobId) {
+		synchronized (jobRefCounters) {
+			RefCount ref = jobRefCounters.get(jobId);
+
+			if (ref == null) {
+				LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls");
+				return;
+			}
+
+			--ref.references;
+			if (ref.references == 0) {
+				ref.keepUntil = System.currentTimeMillis() + cleanupInterval;
+			}
+		}
+	}
+
+	/**
 	 * Returns local copy of the (job-unrelated) file for the BLOB with the given key.
 	 * <p>
 	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in
@@ -148,7 +231,7 @@ public final class BlobCache implements BlobService {
 	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
 	 */
 	@Override
-	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public File getFile(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		return getFileInternal(jobId, key);
 	}
@@ -258,7 +341,7 @@ public final class BlobCache implements BlobService {
 	 * @throws IOException
 	 */
 	@Override
-	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void delete(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteInternal(jobId, key);
 	}
@@ -307,7 +390,7 @@ public final class BlobCache implements BlobService {
 	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
 	 * 		BLOB server cannot delete the file
 	 */
-	public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void deleteGlobal(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteGlobalInternal(jobId, key);
 	}
@@ -341,8 +424,40 @@ public final class BlobCache implements BlobService {
 		return serverAddress.getPort();
 	}
 
+	/**
+	 * Cleans up BLOBs which are not referenced anymore.
+	 */
+	@Override
+	public void run() {
+		synchronized (jobRefCounters) {
+			Iterator<Map.Entry<JobID, RefCount>> entryIter = jobRefCounters.entrySet().iterator();
+			final long currentTimeMillis = System.currentTimeMillis();
+
+			while (entryIter.hasNext()) {
+				Map.Entry<JobID, RefCount> entry = entryIter.next();
+				RefCount ref = entry.getValue();
+
+				if (ref.references <= 0 && ref.keepUntil > 0 && currentTimeMillis >= ref.keepUntil) {
+					JobID jobId = entry.getKey();
+
+					final File localFile =
+						new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+					try {
+						FileUtils.deleteDirectory(localFile);
+						// let's only remove this directory from cleanup if the cleanup was successful
+						entryIter.remove();
+					} catch (Throwable t) {
+						LOG.warn("Failed to locally delete job directory " + localFile.getAbsolutePath(), t);
+					}
+				}
+			}
+		}
+	}
+
 	@Override
 	public void close() throws IOException {
+		cleanupTimer.cancel();
+
 		if (shutdownRequested.compareAndSet(false, true)) {
 			LOG.info("Shutting down BlobCache");
 
@@ -369,8 +484,19 @@ public final class BlobCache implements BlobService {
 		return new BlobClient(serverAddress, blobClientConfig);
 	}
 
-	public File getStorageDir() {
-		return this.storageDir;
+	/**
+	 * Returns a file handle to the file associated with the given blob key on the blob
+	 * server.
+	 *
+	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+	 *
+	 * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param key identifying the file
+	 * @return file handle to the file
+	 */
+	@VisibleForTesting
+	public File getStorageLocation(JobID jobId, BlobKey key) {
+		return BlobUtils.getStorageLocation(storageDir, jobId, key);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9a2f59e..8f1487a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -30,7 +30,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
@@ -166,7 +165,7 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 * 		if an I/O error occurs during the download
 	 */
-	public InputStream get(@Nonnull JobID jobId, BlobKey blobKey) throws IOException {
+	public InputStream get(JobID jobId, BlobKey blobKey) throws IOException {
 		checkNotNull(jobId);
 		return getInternal(jobId, blobKey);
 	}
@@ -339,7 +338,7 @@ public final class BlobClient implements Closeable {
 	 * 		thrown if an I/O error occurs while reading the data from the input stream or uploading the
 	 * 		data to the BLOB server
 	 */
-	public BlobKey put(@Nonnull JobID jobId, InputStream inputStream) throws IOException {
+	public BlobKey put(JobID jobId, InputStream inputStream) throws IOException {
 		checkNotNull(jobId);
 		return putInputStream(jobId, inputStream);
 	}
@@ -369,7 +368,7 @@ public final class BlobClient implements Closeable {
 		checkNotNull(value);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len, socket.getLocalSocketAddress());
+			LOG.debug("PUT BLOB buffer (" + len + " bytes) to " + socket.getLocalSocketAddress() + ".");
 		}
 
 		try {
@@ -556,7 +555,7 @@ public final class BlobClient implements Closeable {
 	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
 	 * 		BLOB server cannot delete the file
 	 */
-	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void delete(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteInternal(jobId, key);
 	}
@@ -603,23 +602,21 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Uploads the JAR files to a {@link BlobServer} at the given address.
-	 * <p>
-	 * TODO: add jobId to signature after adapting the BlobLibraryCacheManager
 	 *
 	 * @param serverAddress
 	 * 		Server address of the {@link BlobServer}
 	 * @param clientConfig
 	 * 		Any additional configuration for the blob client
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param jars
 	 * 		List of JAR files to upload
 	 *
 	 * @throws IOException
 	 * 		if the upload fails
 	 */
-	public static List<BlobKey> uploadJarFiles(
-			InetSocketAddress serverAddress,
-			Configuration clientConfig,
-			List<Path> jars) throws IOException {
+	public static List<BlobKey> uploadJarFiles(InetSocketAddress serverAddress,
+			Configuration clientConfig, JobID jobId, List<Path> jars) throws IOException {checkNotNull(jobId);
 		if (jars.isEmpty()) {
 			return Collections.emptyList();
 		} else {
@@ -631,7 +628,7 @@ public final class BlobClient implements Closeable {
 					FSDataInputStream is = null;
 					try {
 						is = fs.open(jar);
-						final BlobKey key = blobClient.putInputStream(null, is);
+						final BlobKey key = blobClient.putInputStream(jobId, is);
 						blobKeys.add(key);
 					} finally {
 						if (is != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 43a060a..bfcf881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -29,7 +30,6 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import java.io.File;
@@ -196,7 +196,8 @@ public class BlobServer extends Thread implements BlobService {
 	 * @param key identifying the file
 	 * @return file handle to the file
 	 */
-	File getStorageLocation(JobID jobId, BlobKey key) {
+	@VisibleForTesting
+	public File getStorageLocation(JobID jobId, BlobKey key) {
 		return BlobUtils.getStorageLocation(storageDir, jobId, key);
 	}
 
@@ -374,7 +375,7 @@ public class BlobServer extends Thread implements BlobService {
 	 * 		Thrown if the file retrieval failed.
 	 */
 	@Override
-	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public File getFile(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		return getFileInternal(jobId, key);
 	}
@@ -450,7 +451,7 @@ public class BlobServer extends Thread implements BlobService {
 	 * @throws IOException
 	 */
 	@Override
-	public void delete(@Nonnull JobID jobId, BlobKey key) throws IOException {
+	public void delete(JobID jobId, BlobKey key) throws IOException {
 		checkNotNull(jobId);
 		deleteInternal(jobId, key);
 	}
@@ -483,6 +484,37 @@ public class BlobServer extends Thread implements BlobService {
 	}
 
 	/**
+	 * Removes all BLOBs from local and HA store belonging to the given job ID.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to
+	 */
+	public void cleanupJob(JobID jobId) {
+		checkNotNull(jobId);
+
+		final File jobDir =
+			new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+
+		readWriteLock.writeLock().lock();
+
+		try {
+			// delete locally
+			try {
+				FileUtils.deleteDirectory(jobDir);
+			} catch (IOException e) {
+				LOG.warn("Failed to locally delete BLOB storage directory at " +
+					jobDir.getAbsolutePath(), e);
+			}
+
+			// delete in HA store
+			blobStore.deleteAll(jobId);
+		} finally {
+			readWriteLock.writeLock().unlock();
+		}
+	}
+
+
+	/**
 	 * Returns the port on which the server is listening.
 	 *
 	 * @return port on which the server is listening

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index f1054c0..7f617f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -139,14 +139,7 @@ class BlobServerConnection extends Thread {
 			LOG.error("Error while executing BLOB connection.", t);
 		}
 		finally {
-			try {
-				if (clientSocket != null) {
-					clientSocket.close();
-				}
-			} catch (Throwable t) {
-				LOG.debug("Exception while closing BLOB server connection socket.", t);
-			}
-
+			closeSilently(clientSocket, LOG);
 			blobServer.unregisterConnection(this);
 		}
 	}
@@ -433,9 +426,8 @@ class BlobServerConnection extends Thread {
 			final InputStream inputStream, final File incomingFile, final byte[] buf)
 			throws IOException {
 		MessageDigest md = BlobUtils.createMessageDigest();
-		FileOutputStream fos = new FileOutputStream(incomingFile);
 
-		try {
+		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
 			while (true) {
 				final int bytesExpected = readLength(inputStream);
 				if (bytesExpected == -1) {
@@ -453,12 +445,6 @@ class BlobServerConnection extends Thread {
 				md.update(buf, 0, bytesExpected);
 			}
 			return new BlobKey(md.digest());
-		} finally {
-			try {
-				fos.close();
-			} catch (Throwable t) {
-				LOG.warn("Cannot close stream to BLOB staging file", t);
-			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index a78c88c..0db5a58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 
-import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -50,7 +49,7 @@ public interface BlobService extends Closeable {
 	 * @throws java.io.FileNotFoundException when the path does not exist;
 	 * @throws IOException if any other error occurs when retrieving the file
 	 */
-	File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException;
+	File getFile(JobID jobId, BlobKey key) throws IOException;
 
 	/**
 	 * Deletes the (job-unrelated) file associated with the provided blob key.
@@ -67,7 +66,7 @@ public interface BlobService extends Closeable {
 	 * @param key associated with the file to be deleted
 	 * @throws IOException
 	 */
-	void delete(@Nonnull JobID jobId, BlobKey key) throws IOException;
+	void delete(JobID jobId, BlobKey key) throws IOException;
 
 	/**
 	 * Returns the port of the blob service.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 9b5724b..dabd1bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -29,8 +29,8 @@ import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -175,7 +175,7 @@ public class BlobUtils {
 	static File getIncomingDirectory(File storageDir) {
 		final File incomingDir = new File(storageDir, "incoming");
 
-		mkdirTolerateExisting(incomingDir, "incoming");
+		mkdirTolerateExisting(incomingDir);
 
 		return incomingDir;
 	}
@@ -185,15 +185,13 @@ public class BlobUtils {
 	 *
 	 * @param dir
 	 * 		directory to create
-	 * @param dirType
-	 * 		the type of the directory (included in error message if something fails)
 	 */
-	private static void mkdirTolerateExisting(final File dir, final String dirType) {
+	private static void mkdirTolerateExisting(final File dir) {
 		// note: thread-safe create should try to mkdir first and then ignore the case that the
 		//       directory already existed
 		if (!dir.mkdirs() && !dir.exists()) {
 			throw new RuntimeException(
-				"Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'.");
+				"Cannot create directory '" + dir.getAbsolutePath() + "'.");
 		}
 	}
 
@@ -210,10 +208,10 @@ public class BlobUtils {
 	 * @return the (designated) physical storage location of the BLOB
 	 */
 	static File getStorageLocation(
-			@Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+			File storageDir, @Nullable JobID jobId, BlobKey key) {
 		File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 
-		mkdirTolerateExisting(file.getParentFile(), "cache");
+		mkdirTolerateExisting(file.getParentFile());
 
 		return file;
 	}
@@ -229,7 +227,7 @@ public class BlobUtils {
 	 *
 	 * @return the storage directory for BLOBs belonging to the job with the given ID
 	 */
-	static String getStorageLocationPath(@Nonnull String storageDir, @Nullable JobID jobId) {
+	static String getStorageLocationPath(String storageDir, @Nullable JobID jobId) {
 		if (jobId == null) {
 			// format: $base/no_job
 			return String.format("%s/%s", storageDir, NO_JOB_DIR_PREFIX);
@@ -256,7 +254,7 @@ public class BlobUtils {
 	 * @return the path to the given BLOB
 	 */
 	static String getStorageLocationPath(
-			@Nonnull String storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) {
+			String storageDir, @Nullable JobID jobId, BlobKey key) {
 		if (jobId == null) {
 			// format: $base/no_job/blob_$key
 			return String.format("%s/%s/%s%s",
@@ -273,7 +271,6 @@ public class BlobUtils {
 	 *
 	 * @return a new instance of the message digest to use for the BLOB key computation
 	 */
-	@Nonnull
 	static MessageDigest createMessageDigest() {
 		try {
 			return MessageDigest.getInstance(HASHING_ALGORITHM);
@@ -285,7 +282,7 @@ public class BlobUtils {
 	/**
 	 * Adds a shutdown hook to the JVM and returns the Thread, which has been registered.
 	 */
-	static Thread addShutdownHook(final BlobService service, final Logger logger) {
+	static Thread addShutdownHook(final Closeable service, final Logger logger) {
 		checkNotNull(service);
 		checkNotNull(logger);
 
@@ -399,9 +396,7 @@ public class BlobUtils {
 			try {
 				socket.close();
 			} catch (Throwable t) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Error while closing resource after BLOB transfer.", t);
-				}
+				LOG.debug("Exception while closing BLOB server connection socket.", t);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9cc6210..425461c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,8 +234,7 @@ public class JobClient {
 			int pos = 0;
 			for (BlobKey blobKey : props.requiredJarFiles()) {
 				try {
-					// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-					allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
+					allURLs[pos++] = blobClient.getFile(jobID, blobKey).toURI().toURL();
 				} catch (Exception e) {
 					try {
 						blobClient.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 9fc1fc4..bb0b3e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -231,7 +230,7 @@ public abstract class Dispatcher extends RpcEndpoint implements DispatcherGatewa
 		Configuration configuration,
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
-		BlobService blobService,
+		BlobServer blobServer,
 		HeartbeatServices heartbeatServices,
 		MetricRegistry metricRegistry,
 		OnCompletionActions onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 54d698e..dfd6a8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -65,7 +64,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			BlobService blobService,
+			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			OnCompletionActions onCompleteActions,
@@ -77,7 +76,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			configuration,
 			rpcService,
 			highAvailabilityServices,
-			blobService,
+			blobServer,
 			heartbeatServices,
 			metricRegistry,
 			onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 8728186..a7c6120 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -90,7 +89,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			ResourceID resourceId,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			BlobService blobService,
+			BlobServer blobService,
 			HeartbeatServices heartbeatServices,
 			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 9aff6f9..c8fc4e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -23,9 +23,12 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
@@ -33,72 +36,52 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * For each job graph that is submitted to the system the library cache manager maintains
- * a set of libraries (typically JAR files) which the job requires to run. The library cache manager
- * caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
- * programming pattern, so there exists at most one library manager at a time.
- * <p>
- * All files registered via {@link #registerJob(JobID, Collection, Collection)} are reference-counted
- * and are removed by a timer-based cleanup task if their reference counter is zero.
+ * Provides facilities to download a set of libraries (typically JAR files) for a job from a
+ * {@link BlobService} and create a class loader with references to them.
  */
-public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {
+public class BlobLibraryCacheManager implements LibraryCacheManager {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
+
+	private static final ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(-1, -1);
 
-	private static Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
-	
-	private static ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(-1, -1);
-	
 	// --------------------------------------------------------------------------------------------
-	
+
 	/** The global lock to synchronize operations */
 	private final Object lockObject = new Object();
 
 	/** Registered entries per job */
-	private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<JobID, LibraryCacheEntry>();
-	
-	/** Map to store the number of reference to a specific file */
-	private final Map<BlobKey, Integer> blobKeyReferenceCounters = new HashMap<BlobKey, Integer>();
+	private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<>();
 
 	/** The blob service to download libraries */
 	private final BlobService blobService;
-	
-	private final Timer cleanupTimer;
-	
+
 	// --------------------------------------------------------------------------------------------
 
-	/**
-	 * Creates the blob library cache manager.
-	 *
-	 * @param blobService blob file retrieval service to use
-	 * @param cleanupInterval cleanup interval in milliseconds
-	 */
-	public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) {
+	public BlobLibraryCacheManager(BlobService blobService) {
 		this.blobService = checkNotNull(blobService);
-
-		// Initializing the clean up task
-		this.cleanupTimer = new Timer(true);
-		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
 	}
 
-	// --------------------------------------------------------------------------------------------
-	
 	@Override
 	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
-			throws IOException {
+		throws IOException {
 		registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles, requiredClasspaths);
 	}
-	
+
 	@Override
-	public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey> requiredJarFiles,
-			Collection<URL> requiredClasspaths) throws IOException {
+	public void registerTask(
+		JobID jobId,
+		ExecutionAttemptID task,
+		@Nullable Collection<BlobKey> requiredJarFiles,
+		@Nullable Collection<URL> requiredClasspaths) throws IOException {
+
 		checkNotNull(jobId, "The JobId must not be null.");
 		checkNotNull(task, "The task execution id must not be null.");
 
@@ -113,43 +96,31 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			LibraryCacheEntry entry = cacheEntries.get(jobId);
 
 			if (entry == null) {
-				// create a new entry in the library cache
-				BlobKey[] keys = requiredJarFiles.toArray(new BlobKey[requiredJarFiles.size()]);
-				URL[] urls = new URL[keys.length + requiredClasspaths.size()];
-
+				URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
 				int count = 0;
 				try {
-					for (; count < keys.length; count++) {
-						BlobKey blobKey = keys[count];
-						urls[count] = registerReferenceToBlobKeyAndGetURL(blobKey);
-					}
-				}
-				catch (Throwable t) {
-					// undo the reference count increases
-					try {
-						for (int i = 0; i < count; i++) {
-							unregisterReferenceToBlobKey(keys[i]);
-						}
+					// add URLs to locally cached JAR files
+					for (BlobKey key : requiredJarFiles) {
+						urls[count] = blobService.getFile(jobId, key).toURI().toURL();
+						++count;
 					}
-					catch (Throwable tt) {
-						LOG.error("Error while updating library reference counters.", tt);
+
+					// add classpaths
+					for (URL url : requiredClasspaths) {
+						urls[count] = url;
+						++count;
 					}
 
+					cacheEntries.put(jobId, new LibraryCacheEntry(
+						requiredJarFiles, requiredClasspaths, urls, task));
+				} catch (Throwable t) {
 					// rethrow or wrap
 					ExceptionUtils.tryRethrowIOException(t);
-					throw new IOException("Library cache could not register the user code libraries.", t);
+					throw new IOException(
+						"Library cache could not register the user code libraries.", t);
 				}
-
-				// add classpaths
-				for (URL url : requiredClasspaths) {
-					urls[count] = url;
-					count++;
-				}
-
-				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, urls, task));
-			}
-			else {
-				entry.register(task, requiredJarFiles);
+			} else {
+				entry.register(task, requiredJarFiles, requiredClasspaths);
 			}
 		}
 	}
@@ -158,7 +129,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 	public void unregisterJob(JobID id) {
 		unregisterTask(id, JOB_ATTEMPT_ID);
 	}
-	
+
 	@Override
 	public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
 		checkNotNull(jobId, "The JobId must not be null.");
@@ -172,162 +143,167 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 					cacheEntries.remove(jobId);
 
 					entry.releaseClassLoader();
-
-					for (BlobKey key : entry.getLibraries()) {
-						unregisterReferenceToBlobKey(key);
-					}
 				}
 			}
 			// else has already been unregistered
 		}
 	}
-
+	
 	@Override
-	public ClassLoader getClassLoader(JobID id) {
-		if (id == null) {
-			throw new IllegalArgumentException("The JobId must not be null.");
-		}
-		
+	public ClassLoader getClassLoader(JobID jobId) {
+		checkNotNull(jobId, "The JobId must not be null.");
+
 		synchronized (lockObject) {
-			LibraryCacheEntry entry = cacheEntries.get(id);
-			if (entry != null) {
-				return entry.getClassLoader();
-			} else {
-				throw new IllegalStateException("No libraries are registered for job " + id);
+			LibraryCacheEntry entry = cacheEntries.get(jobId);
+			if (entry == null) {
+				throw new IllegalStateException("No libraries are registered for job " + jobId);
 			}
+			return entry.getClassLoader();
 		}
 	}
 
-	public int getBlobServerPort() {
-		return blobService.getPort();
-	}
-
-	@Override
-	public void shutdown() throws IOException{
-		try {
-			run();
-		} catch (Throwable t) {
-			LOG.warn("Failed to run clean up task before shutdown", t);
-		}
-
-		blobService.close();
-		cleanupTimer.cancel();
-	}
-	
 	/**
-	 * Cleans up blobs which are not referenced anymore
+	 * Gets the number of tasks holding {@link ClassLoader} references for the given job.
+	 *
+	 * @param jobId ID of a job
+	 *
+	 * @return number of reference holders
 	 */
-	@Override
-	public void run() {
-		synchronized (lockObject) {
-			Iterator<Map.Entry<BlobKey, Integer>> entryIter = blobKeyReferenceCounters.entrySet().iterator();
-			
-			while (entryIter.hasNext()) {
-				Map.Entry<BlobKey, Integer> entry = entryIter.next();
-				BlobKey key = entry.getKey();
-				int references = entry.getValue();
-				
-				try {
-					if (references <= 0) {
-						blobService.delete(key);
-						entryIter.remove();
-					}
-				} catch (Throwable t) {
-					LOG.warn("Could not delete file with blob key" + key, t);
-				}
-			}
-		}
-	}
-	
-	public int getNumberOfReferenceHolders(JobID jobId) {
+	int getNumberOfReferenceHolders(JobID jobId) {
 		synchronized (lockObject) {
 			LibraryCacheEntry entry = cacheEntries.get(jobId);
 			return entry == null ? 0 : entry.getNumberOfReferenceHolders();
 		}
 	}
-	
-	int getNumberOfCachedLibraries() {
-		return blobKeyReferenceCounters.size();
-	}
-	
-	private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException {
-		// it is important that we fetch the URL before increasing the counter.
-		// in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter
-		try {
-			URL url = blobService.getFile(key).toURI().toURL();
 
-			Integer references = blobKeyReferenceCounters.get(key);
-			int newReferences = references == null ? 1 : references + 1;
-			blobKeyReferenceCounters.put(key, newReferences);
-
-			return url;
-		}
-		catch (IOException e) {
-			throw new IOException("Cannot get library with hash " + key, e);
-		}
+	/**
+	 * Returns the number of registered jobs that this library cache manager handles.
+	 *
+	 * @return number of jobs (irrespective of the actual number of tasks per job)
+	 */
+	int getNumberOfManagedJobs() {
+		// no synchronisation necessary
+		return cacheEntries.size();
 	}
-	
-	private void unregisterReferenceToBlobKey(BlobKey key) {
-		Integer references = blobKeyReferenceCounters.get(key);
-		if (references != null) {
-			int newReferences = Math.max(references - 1, 0);
-			blobKeyReferenceCounters.put(key, newReferences);
-		}
-		else {
-			// make sure we have an entry in any case, that the cleanup timer removes any
-			// present libraries
-			blobKeyReferenceCounters.put(key, 0);
+
+	@Override
+	public void shutdown() {
+		synchronized (lockObject) {
+			for (LibraryCacheEntry entry : cacheEntries.values()) {
+				entry.releaseClassLoader();
+			}
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * An entry in the per-job library cache. Tracks which execution attempts
 	 * still reference the libraries. Once none reference it any more, the
-	 * libraries can be cleaned up.
+	 * class loaders can be cleaned up.
 	 */
 	private static class LibraryCacheEntry {
-		
+
 		private final FlinkUserCodeClassLoader classLoader;
-		
+
 		private final Set<ExecutionAttemptID> referenceHolders;
-		
+		/**
+		 * Set of BLOB keys used for a previous job/task registration.
+		 *
+		 * <p>The purpose of this is to make sure, future registrations do not differ in content as
+		 * this is a contract of the {@link BlobLibraryCacheManager}.
+		 */
 		private final Set<BlobKey> libraries;
-		
-		
-		public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] libraryURLs, ExecutionAttemptID initialReference) {
+
+		/**
+		 * Set of class path URLs used for a previous job/task registration.
+		 *
+		 * <p>The purpose of this is to make sure, future registrations do not differ in content as
+		 * this is a contract of the {@link BlobLibraryCacheManager}.
+		 */
+		private final Set<String> classPaths;
+
+		/**
+		 * Creates a cache entry for a flink class loader with the given <tt>libraryURLs</tt>.
+		 *
+		 * @param requiredLibraries
+		 * 		BLOB keys required by the class loader (stored for ensuring consistency among different
+		 * 		job/task registrations)
+		 * @param requiredClasspaths
+		 * 		class paths required by the class loader (stored for ensuring consistency among
+		 * 		different job/task registrations)
+		 * @param libraryURLs
+		 * 		complete list of URLs to use for the class loader (includes references to the
+		 * 		<tt>requiredLibraries</tt> and <tt>requiredClasspaths</tt>)
+		 * @param initialReference
+		 * 		reference holder ID
+		 */
+		LibraryCacheEntry(
+				Collection<BlobKey> requiredLibraries,
+				Collection<URL> requiredClasspaths,
+				URL[] libraryURLs,
+				ExecutionAttemptID initialReference) {
+
 			this.classLoader = new FlinkUserCodeClassLoader(libraryURLs);
-			this.libraries = new HashSet<>(libraries);
+			// NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons
+			//       see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS
+			//       -> alternatively, compare their string representation
+			this.classPaths = new HashSet<>(requiredClasspaths.size());
+			for (URL url : requiredClasspaths) {
+				classPaths.add(url.toString());
+			}
+			this.libraries = new HashSet<>(requiredLibraries);
 			this.referenceHolders = new HashSet<>();
 			this.referenceHolders.add(initialReference);
 		}
-		
-		
+
 		public ClassLoader getClassLoader() {
 			return classLoader;
 		}
-		
+
 		public Set<BlobKey> getLibraries() {
 			return libraries;
 		}
-		
-		public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
-			if (!libraries.containsAll(keys)) {
+
+		public void register(
+				ExecutionAttemptID task, Collection<BlobKey> requiredLibraries,
+				Collection<URL> requiredClasspaths) {
+
+			// Make sure the previous registration referred to the same libraries and class paths.
+			// NOTE: the original collections may contain duplicates and may not already be Set
+			//       collections with fast checks whether an item is contained in it.
+
+			// lazy construction of a new set for faster comparisons
+			if (libraries.size() != requiredLibraries.size() ||
+				!new HashSet<>(requiredLibraries).containsAll(libraries)) {
+
 				throw new IllegalStateException(
-						"The library registration references a different set of libraries than previous registrations for this job.");
+					"The library registration references a different set of library BLOBs than" +
+						" previous registrations for this job:\nold:" + libraries.toString() +
+						"\nnew:" + requiredLibraries.toString());
 			}
-			
+
+			// lazy construction of a new set with String representations of the URLs
+			if (classPaths.size() != requiredClasspaths.size() ||
+				!requiredClasspaths.stream().map(URL::toString).collect(Collectors.toSet())
+					.containsAll(classPaths)) {
+
+				throw new IllegalStateException(
+					"The library registration references a different set of library BLOBs than" +
+						" previous registrations for this job:\nold:" +
+						classPaths.toString() +
+						"\nnew:" + requiredClasspaths.toString());
+			}
+
 			this.referenceHolders.add(task);
 		}
-		
+
 		public boolean unregister(ExecutionAttemptID task) {
 			referenceHolders.remove(task);
 			return referenceHolders.isEmpty();
 		}
-		
-		public int getNumberOfReferenceHolders() {
+
+		int getNumberOfReferenceHolders() {
 			return referenceHolders.size();
 		}
 
@@ -343,5 +319,4 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			}
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 8e14e58..41eeb18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -28,7 +28,7 @@ import java.net.URL;
 import java.util.Collection;
 
 public class FallbackLibraryCacheManager implements LibraryCacheManager {
-	
+
 	private static Logger LOG = LoggerFactory.getLogger(FallbackLibraryCacheManager.class);
 
 	@Override
@@ -40,10 +40,10 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
 	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
-	
+
 	@Override
 	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
-			Collection<URL> requiredClasspaths) {
+		Collection<URL> requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
 
@@ -51,7 +51,7 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager {
 	public void unregisterJob(JobID id) {
 		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
 	}
-	
+
 	@Override
 	public void unregisterTask(JobID id, ExecutionAttemptID execution) {
 		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 5f9f443..93c6efd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 
 public interface LibraryCacheManager {
+
 	/**
 	 * Returns the user code class loader associated with id.
 	 *
@@ -36,30 +37,34 @@ public interface LibraryCacheManager {
 	ClassLoader getClassLoader(JobID id);
 
 	/**
-	 * Registers a job with its required jar files and classpaths. The jar files are identified by their blob keys.
+	 * Registers a job with its required jar files and classpaths. The jar files are identified by
+	 * their blob keys and downloaded for use by a {@link ClassLoader}.
 	 *
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @param requiredClasspaths collection of classpaths that are added to the user code class loader
+	 *
 	 * @throws IOException if any error occurs when retrieving the required jar files
 	 *
 	 * @see #unregisterJob(JobID) counterpart of this method
 	 */
 	void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL> requiredClasspaths)
-			throws IOException;
-	
+		throws IOException;
+
 	/**
-	 * Registers a job task execution with its required jar files and classpaths. The jar files are identified by their blob keys.
+	 * Registers a job task execution with its required jar files and classpaths. The jar files are
+	 * identified by their blob keys and downloaded for use by a {@link ClassLoader}.
 	 *
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @param requiredClasspaths collection of classpaths that are added to the user code class loader
-	 * @throws IOException
+	 *
+	 * @throws IOException if any error occurs when retrieving the required jar files
 	 *
 	 * @see #unregisterTask(JobID, ExecutionAttemptID) counterpart of this method
 	 */
 	void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
-			Collection<URL> requiredClasspaths) throws IOException;
+		Collection<URL> requiredClasspaths) throws IOException;
 
 	/**
 	 * Unregisters a job task execution from the library cache manager.
@@ -88,9 +93,7 @@ public interface LibraryCacheManager {
 	void unregisterJob(JobID id);
 
 	/**
-	 * Shutdown method
-	 *
-	 * @throws IOException
+	 * Shutdown method which may release created class loaders.
 	 */
-	void shutdown() throws IOException;
+	void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6b92d79..c126875 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -536,7 +536,7 @@ public class JobGraph implements Serializable {
 			Configuration blobClientConfig) throws IOException {
 		if (!userJars.isEmpty()) {
 			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
-			List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
+			List<BlobKey> blobKeys = BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, jobID, userJars);
 
 			for (BlobKey blobKey : blobKeys) {
 				if (!userJarBlobKeys.contains(blobKey)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 5838cf2..c312cd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -93,7 +93,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
-			final BlobService blobService,
+			final BlobServer blobService,
 			final HeartbeatServices heartbeatServices,
 			final OnCompletionActions toNotifyOnComplete,
 			final FatalErrorHandler errorHandler) throws Exception {
@@ -116,7 +116,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices haServices,
-			final BlobService blobService,
+			final BlobServer blobService,
 			final HeartbeatServices heartbeatServices,
 			final MetricRegistry metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
@@ -199,6 +199,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 				haServices,
 				heartbeatServices,
 				jobManagerServices.executorService,
+				jobManagerServices.blobServer,
 				jobManagerServices.libraryCacheManager,
 				jobManagerServices.restartStrategyFactory,
 				jobManagerServices.rpcAskTimeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e14f5af..57aeaff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -45,6 +44,7 @@ public class JobManagerServices {
 
 	public final ScheduledExecutorService executorService;
 
+	public final BlobServer blobServer;
 	public final BlobLibraryCacheManager libraryCacheManager;
 
 	public final RestartStrategyFactory restartStrategyFactory;
@@ -53,11 +53,13 @@ public class JobManagerServices {
 
 	public JobManagerServices(
 			ScheduledExecutorService executorService,
+			BlobServer blobServer,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout) {
 
 		this.executorService = checkNotNull(executorService);
+		this.blobServer = checkNotNull(blobServer);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
 		this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
@@ -80,8 +82,9 @@ public class JobManagerServices {
 			firstException = t;
 		}
 
+		libraryCacheManager.shutdown();
 		try {
-			libraryCacheManager.shutdown();
+			blobServer.close();
 		}
 		catch (Throwable t) {
 			if (firstException == null) {
@@ -103,16 +106,12 @@ public class JobManagerServices {
 
 	public static JobManagerServices fromConfiguration(
 			Configuration config,
-			BlobService blobService) throws Exception {
+			BlobServer blobServer) throws Exception {
 
 		Preconditions.checkNotNull(config);
-		Preconditions.checkNotNull(blobService);
+		Preconditions.checkNotNull(blobServer);
 
-		final long cleanupInterval = config.getLong(
-			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
-		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobService, cleanupInterval);
+		final BlobLibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(blobServer);
 
 		final FiniteDuration timeout;
 		try {
@@ -127,6 +126,7 @@ public class JobManagerServices {
 
 		return new JobManagerServices(
 			futureExecutor,
+			blobServer,
 			libraryCacheManager,
 			RestartStrategyFactory.createRestartStrategyFactory(config),
 			Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d6019db..a8a8632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -149,7 +150,10 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	/** Service to contend for and retrieve the leadership of JM and RM */
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	/** Blob cache manager used across jobs */
+	/** Blob server used across jobs */
+	private final BlobServer blobServer;
+
+	/** Blob library cache manager used across jobs */
 	private final BlobLibraryCacheManager libraryCacheManager;
 
 	/** The metrics for the JobManager itself */
@@ -204,6 +208,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 			HighAvailabilityServices highAvailabilityService,
 			HeartbeatServices heartbeatServices,
 			ScheduledExecutorService executor,
+			BlobServer blobServer,
 			BlobLibraryCacheManager libraryCacheManager,
 			RestartStrategyFactory restartStrategyFactory,
 			Time rpcAskTimeout,
@@ -221,6 +226,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 		this.configuration = checkNotNull(configuration);
 		this.rpcTimeout = rpcAskTimeout;
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
+		this.blobServer = checkNotNull(blobServer);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.executor = checkNotNull(executor);
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
@@ -698,7 +704,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	@Override
 	public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
 		return CompletableFuture.completedFuture(
-			new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+			new ClassloadingProps(blobServer.getPort(),
 				executionGraph.getRequiredJarFiles(),
 				executionGraph.getRequiredClasspaths()));
 	}
@@ -785,7 +791,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
-				resourceId, libraryCacheManager.getBlobServerPort());
+				resourceId, blobServer.getPort());
 			return CompletableFuture.completedFuture(response);
 		} else {
 			return getRpcService()
@@ -819,7 +825,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 							}
 						});
 
-						return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
+						return new JMTMRegistrationSuccess(resourceId, blobServer.getPort());
 					},
 					getMainThreadExecutor());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 98c7bf1..363c107 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -53,6 +54,9 @@ public class JobManagerConnection {
 	// Checkpoint responder for the specific job manager
 	private final CheckpointResponder checkpointResponder;
 
+	// BLOB cache connected to the BLOB server at the specific job manager
+	private final BlobCache blobCache;
+
 	// Library cache manager connected to the specific job manager
 	private final LibraryCacheManager libraryCacheManager;
 
@@ -63,21 +67,22 @@ public class JobManagerConnection {
 	private final PartitionProducerStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
-		JobID jobID,
-		ResourceID resourceID,
-		JobMasterGateway jobMasterGateway,
-		UUID leaderId,
-		TaskManagerActions taskManagerActions,
-		CheckpointResponder checkpointResponder,
-		LibraryCacheManager libraryCacheManager,
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
-		PartitionProducerStateChecker partitionStateChecker) {
+				JobID jobID,
+				ResourceID resourceID,
+				JobMasterGateway jobMasterGateway,
+				UUID leaderId,
+				TaskManagerActions taskManagerActions,
+				CheckpointResponder checkpointResponder,
+				BlobCache blobCache, LibraryCacheManager libraryCacheManager,
+				ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+				PartitionProducerStateChecker partitionStateChecker) {
 		this.jobID = Preconditions.checkNotNull(jobID);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
 		this.leaderId = Preconditions.checkNotNull(leaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
+		this.blobCache = Preconditions.checkNotNull(blobCache);
 		this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
 		this.resultPartitionConsumableNotifier = Preconditions.checkNotNull(resultPartitionConsumableNotifier);
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
@@ -111,6 +116,15 @@ public class JobManagerConnection {
 		return libraryCacheManager;
 	}
 
+	/**
+	 * Gets the BLOB cache connected to the respective BLOB server instance at the job manager.
+	 *
+	 * @return BLOB cache
+	 */
+	public BlobCache getBlobCache() {
+		return blobCache;
+	}
+
 	public ResultPartitionConsumableNotifier getResultPartitionConsumableNotifier() {
 		return resultPartitionConsumableNotifier;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4abcdf4..a5ce84b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -352,6 +352,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 			TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
 			CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
+			BlobCache blobCache = jobManagerConnection.getBlobCache();
 			LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
 			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
 			PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
@@ -374,6 +375,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				taskManagerActions,
 				inputSplitProvider,
 				checkpointResponder,
+				blobCache,
 				libraryCache,
 				fileCache,
 				taskManagerConfiguration,
@@ -935,14 +937,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
 
 		final LibraryCacheManager libraryCacheManager;
+		final BlobCache blobCache;
 		try {
-			final BlobCache blobCache = new BlobCache(
+			blobCache = new BlobCache(
 				blobServerAddress,
 				taskManagerConfiguration.getConfiguration(),
 				haServices.createBlobStore());
-			libraryCacheManager = new BlobLibraryCacheManager(
-				blobCache,
-				taskManagerConfiguration.getCleanupInterval());
+			libraryCacheManager = new BlobLibraryCacheManager(blobCache);
 		} catch (IOException e) {
 			// Can't pass the IOException up - we need a RuntimeException anyway
 			// two levels up where this is run asynchronously. Also, we don't
@@ -967,6 +968,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			jobManagerLeaderId,
 			taskManagerActions,
 			checkpointResponder,
+			blobCache,
 			libraryCacheManager,
 			resultPartitionConsumableNotifier,
 			partitionStateChecker);
@@ -977,6 +979,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
 		jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		jobManagerConnection.getLibraryCacheManager().shutdown();
+		jobManagerConnection.getBlobCache().close();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index ea9f576..7c7693b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -53,8 +54,6 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 	private final Time maxRegistrationPause;
 	private final Time refusedRegistrationPause;
 
-	private final long cleanupInterval;
-
 	private final UnmodifiableConfiguration configuration;
 
 	private final boolean exitJvmOnOutOfMemory;
@@ -78,7 +77,6 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause);
 		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
 		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
-		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
 		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
 		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
 	}
@@ -107,10 +105,6 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		return refusedRegistrationPause;
 	}
 
-	public long getCleanupInterval() {
-		return cleanupInterval;
-	}
-
 	@Override
 	public Configuration getConfiguration() {
 		return configuration;
@@ -153,9 +147,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 
 		LOG.info("Messages have a max timeout of " + timeout);
 
-		final long cleanupInterval = configuration.getLong(
-			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+		final long cleanupInterval = configuration.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
 
 		final Time finiteRegistrationDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 04cb990..d628960 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -203,7 +204,10 @@ public class Task implements Runnable, TaskActions {
 	/** All listener that want to be notified about changes in the task's execution state */
 	private final List<TaskExecutionStateListener> taskExecutionStateListeners;
 
-	/** The library cache, from which the task can request its required JAR files */
+	/** The BLOB cache, from which the task can request BLOB files */
+	private final BlobCache blobCache;
+
+	/** The library cache, from which the task can request its class loader */
 	private final LibraryCacheManager libraryCache;
 
 	/** The cache for user-defined files that the invokable requires */
@@ -282,6 +286,7 @@ public class Task implements Runnable, TaskActions {
 		TaskManagerActions taskManagerActions,
 		InputSplitProvider inputSplitProvider,
 		CheckpointResponder checkpointResponder,
+		BlobCache blobCache,
 		LibraryCacheManager libraryCache,
 		FileCache fileCache,
 		TaskManagerRuntimeInfo taskManagerConfig,
@@ -330,6 +335,7 @@ public class Task implements Runnable, TaskActions {
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
 		this.taskManagerActions = checkNotNull(taskManagerActions);
 
+		this.blobCache = Preconditions.checkNotNull(blobCache);
 		this.libraryCache = Preconditions.checkNotNull(libraryCache);
 		this.fileCache = Preconditions.checkNotNull(fileCache);
 		this.network = Preconditions.checkNotNull(networkEnvironment);
@@ -568,6 +574,8 @@ public class Task implements Runnable, TaskActions {
 			LOG.info("Creating FileSystem stream leak safety net for task {}", this);
 			FileSystemSafetyNet.initializeSafetyNetForThread();
 
+			blobCache.registerJob(jobId);
+
 			// first of all, get a user-code classloader
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task {}.", this);
@@ -827,6 +835,7 @@ public class Task implements Runnable, TaskActions {
 
 				// remove all of the tasks library resources
 				libraryCache.unregisterTask(jobId, executionId);
+				blobCache.releaseJob(jobId);
 
 				// remove all files in the distributed cache
 				removeCachedFiles(distributedCacheEntries, fileCache);
@@ -862,7 +871,7 @@ public class Task implements Runnable, TaskActions {
 		// triggers the download of all missing jar files from the job manager
 		libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
 
-		LOG.debug("Register task {} at library cache manager took {} milliseconds",
+		LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds",
 				executionId, System.currentTimeMillis() - startDownloadTime);
 
 		ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId);


Mime
View raw message