flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/6] git commit: [FLINK-1205] Fix library cache manager to track references to tasks and revent accidental duplicate registration/deregistration
Date Tue, 04 Nov 2014 10:06:36 GMT
[FLINK-1205] Fix library cache manager to track references to tasks and revent accidental duplicate
registration/deregistration


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a6152c37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a6152c37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a6152c37

Branch: refs/heads/master
Commit: a6152c372b86d3a62745b3703975d5d4eb243053
Parents: 5e48fc9
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Nov 3 20:43:00 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Nov 3 20:43:00 2014 +0100

----------------------------------------------------------------------
 .../librarycache/BlobLibraryCacheManager.java   | 326 +++++++++----------
 .../FallbackLibraryCacheManager.java            |  16 +-
 .../librarycache/LibraryCacheManager.java       |  25 +-
 .../flink/runtime/jobmanager/JobManager.java    |   6 +-
 .../flink/runtime/taskmanager/TaskManager.java  |  11 +-
 .../BlobLibraryCacheManagerTest.java            |   6 +-
 6 files changed, 206 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 441f57e..70b60fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -30,17 +31,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * For each job graph that is submitted to the system the library cache manager maintains
  * a set of libraries (typically JAR files) which the job requires to run. The library cache
manager
@@ -52,43 +54,25 @@ import org.slf4j.LoggerFactory;
 public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager
{
 
 	private static Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class);
-
-	/**
-	 * Dummy object used in the lock map.
-	 */
+	
+	private static ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID();
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/** The global lock to synchronize operations */
 	private final Object lockObject = new Object();
 
-	/**
-	 * Map to translate a job ID to the responsible class loaders.
-	 */
-	private final ConcurrentMap<JobID, URLClassLoader> classLoaders = new
-			ConcurrentHashMap<JobID, URLClassLoader>();
-
-	/**
-	 * Map to store the number of references to a specific library manager entry.
-	 */
-	private final Map<JobID, Integer> libraryReferenceCounter = new HashMap<JobID,
-			Integer>();
-
-	/**
-	 * Map to store the blob keys referenced by a specific job
-	 */
-	private final Map<JobID, Collection<BlobKey>> requiredJars = new
-			HashMap<JobID, Collection<BlobKey>>();
-
-	/**
-	 * Map to store the number of reference to a specific file
-	 */
-	private final Map<BlobKey, Integer> blobKeyReferenceCounter = new
-			HashMap<BlobKey, Integer>();
-
-	/**
-	 * All registered blobs
-	 */
-	private final Set<BlobKey> registeredBlobs = new HashSet<BlobKey>();
+	/** Registered entries per job */
+	private final Map<JobID, LibraryCacheEntry> cacheEntries = new HashMap<JobID, LibraryCacheEntry>();
+	
+	/** Map to store the number of reference to a specific file */
+	private final Map<BlobKey, Integer> blobKeyReferenceCounters = new HashMap<BlobKey,
Integer>();
 
+	/** The blob service to download libraries */
 	private final BlobService blobService;
-
+	
+	// --------------------------------------------------------------------------------------------
+	
 	public BlobLibraryCacheManager(BlobService blobService, Configuration configuration){
 		this.blobService = blobService;
 
@@ -100,143 +84,82 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 		timer.schedule(this, cleanupInterval);
 	}
 
-	/**
-	 * Increments the reference counter of the corrsponding map
-	 *
-	 * @param key
-	 *        the key identifying the counter to increment
-	 * @return the increased reference counter
-	 */
-	private <K> int incrementReferenceCounter(final K key, final Map<K,
-	Integer> map) {
-
-		if(!map.containsKey(key)){
-			map.put(key, 1);
-
-			return 1;
-		}else{
-			int counter = map.get(key) + 1;
-			map.put(key, counter);
-
-			return counter;
-		}
-	}
-
-	/**
-	 * Decrements the reference counter associated with the key
-	 *
-	 * @param key
-	 *        the key identifying the counter to decrement
-	 * @return the decremented reference counter
-	 */
-	private <K> int decrementReferenceCounter(final K key, final Map<K,
-			Integer> map) {
-
-		if (!map.containsKey(key)) {
-			throw new IllegalStateException("Cannot find reference counter entry for key " + key);
-		}else{
-			int counter = map.get(key) -1;
-
-			if(counter == 0){
-				map.remove(key);
-			}else{
-				map.put(key, counter);
-			}
-
-			return counter;
-		}
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException
{
+		registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles);
 	}
-
-	/**
-	 * Registers a job ID with a set of library paths that are required to run the job. For
every registered
-	 * job the library cache manager creates a class loader that is used to instantiate the
vertex's environment later
-	 * on.
-	 * 
-	 * @param id
-	 *        the ID of the job to be registered.
-	 * @param requiredJarFiles
-	 *        the client path's of the required libraries
-	 * @throws IOException
-	 *         thrown if one of the requested libraries is not in the cache
-	 */
+	
 	@Override
-	public void register(final JobID id, final Collection<BlobKey> requiredJarFiles) throws
-			IOException {
-
+	public void registerTask(JobID jobId, ExecutionAttemptID task, Collection<BlobKey>
requiredJarFiles) throws IOException {
+		Preconditions.checkNotNull(jobId, "The JobId must not be null.");
+		Preconditions.checkNotNull(task, "The task execution id must not be null.");
+		
+		if (requiredJarFiles == null) {
+			requiredJarFiles = Collections.emptySet();
+		}
+		
 		synchronized (lockObject) {
-			if (incrementReferenceCounter(id, libraryReferenceCounter) > 1) {
-				return;
-			}
-
-			// Check if library manager entry for this id already exists
-			if (this.classLoaders.containsKey(id)) {
-				throw new IllegalStateException("Library cache manager already contains " +
-						"class loader entry for job ID " + id);
-			}
-
-			if (requiredJars.containsKey(id)) {
-				throw new IllegalStateException("Library cache manager already contains blob keys" +
-						" entry for job ID " + id);
+			LibraryCacheEntry entry = cacheEntries.get(jobId);
+			
+			if (entry == null) {
+				URL[] urls = new URL[requiredJarFiles.size()];
+
+				int count = 0;
+				for (BlobKey blobKey : requiredJarFiles) {
+					urls[count++] = registerReferenceToBlobKeyAndGetURL(blobKey);
+				}
+				
+				URLClassLoader classLoader = new URLClassLoader(urls);
+				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
 			}
-
-			requiredJars.put(id, requiredJarFiles);
-
-			URL[] urls = new URL[requiredJarFiles.size()];
-			int count = 0;
-
-			for (BlobKey blobKey : requiredJarFiles) {
-				urls[count++] = registerBlobKeyAndGetURL(blobKey);
+			else {
+				entry.register(task, requiredJarFiles);
 			}
-
-			final URLClassLoader classLoader = new URLClassLoader(urls);
-			this.classLoaders.put(id, classLoader);
 		}
 	}
 
-	private URL registerBlobKeyAndGetURL(BlobKey key) throws IOException{
-		if(incrementReferenceCounter(key, blobKeyReferenceCounter) == 1){
-			// registration might happen even if the file is already stored locally
-			registeredBlobs.add(key);
-		}
-
-		return blobService.getURL(key);
+	@Override
+	public void unregisterJob(JobID id) {
+		unregisterTask(id, JOB_ATTEMPT_ID);
 	}
-
-	/**
-	 * Unregisters a job ID and releases the resources associated with it.
-	 * 
-	 * @param id
-	 *        the job ID to unregister
-	 */
+	
 	@Override
-	public void unregister(final JobID id) {
+	public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
+		Preconditions.checkNotNull(jobId, "The JobId must not be null.");
+		Preconditions.checkNotNull(task, "The task execution id must not be null.");
+		
 		synchronized (lockObject) {
-			if (decrementReferenceCounter(id, libraryReferenceCounter) == 0) {
-				this.classLoaders.remove(id);
-
-				Collection<BlobKey> keys = requiredJars.get(id);
-
-				for (BlobKey key : keys) {
-					decrementReferenceCounter(key, blobKeyReferenceCounter);
+			LibraryCacheEntry entry = cacheEntries.get(jobId);
+			
+			if (entry != null) {
+				if (entry.unregister(task)) {
+					cacheEntries.remove(jobId);
+					
+					for (BlobKey key : entry.getLibraries()) {
+						unregisterReferenceToBlobKey(key);
+					}
 				}
-
-				requiredJars.remove(id);
 			}
+			// else has already been unregistered
 		}
-
 	}
 
-	/**
-	 * Returns the class loader to the specified vertex.
-	 * 
-	 * @param id
-	 *        the ID of the job to return the class loader for
-	 * @return the class loader of requested vertex or <code>null</code> if no class
loader has been registered with the
-	 *         given ID.
-	 */
 	@Override
-	public ClassLoader getClassLoader(final JobID id) {
-		return this.classLoaders.get(id);
+	public ClassLoader getClassLoader(JobID id) {
+		if (id == null) {
+			throw new IllegalArgumentException("The JobId must not be null.");
+		}
+		
+		synchronized (lockObject) {
+			LibraryCacheEntry entry = cacheEntries.get(id);
+			if (entry != null) {
+				return entry.getClassLoader();
+			} else {
+				throw new IllegalStateException("No libraries are registered for job " + id);
+			}
+		}
 	}
 
 	@Override
@@ -252,27 +175,100 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 	public void shutdown() throws IOException{
 		blobService.shutdown();
 	}
-
+	
 	/**
 	 * Cleans up blobs which are not referenced anymore
 	 */
 	@Override
 	public void run() {
 		synchronized (lockObject) {
-			Iterator<BlobKey> it = registeredBlobs.iterator();
-
-			while (it.hasNext()) {
-				BlobKey key = it.next();
-
+			
+			Iterator<Map.Entry<BlobKey, Integer>> entryIter = blobKeyReferenceCounters.entrySet().iterator();
+			
+			while (entryIter.hasNext()) {
+				Map.Entry<BlobKey, Integer> entry = entryIter.next();
+				BlobKey key = entry.getKey();
+				int references = entry.getValue();
+				
 				try {
-					if (!blobKeyReferenceCounter.containsKey(key)) {
+					if (references <= 0) {
 						blobService.delete(key);
-						it.remove();
+						entryIter.remove();
 					}
-				} catch (IOException ioe) {
-					LOG.warn("Could not delete file with blob key" + key, ioe);
+				} catch (Throwable t) {
+					LOG.warn("Could not delete file with blob key" + key, t);
 				}
 			}
 		}
 	}
+	
+	public int getNumberOfReferenceHolders(JobID jobId) {
+		synchronized (lockObject) {
+			LibraryCacheEntry entry = cacheEntries.get(jobId);
+			return entry == null ? 0 : entry.getNumberOfReferenceHolders();
+		}
+	}
+	
+	private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException {
+		
+		Integer references = blobKeyReferenceCounters.get(key);
+		int newReferences = references == null ? 1 : references.intValue() + 1;
+		
+		blobKeyReferenceCounters.put(key, newReferences);
+
+		return blobService.getURL(key);
+	}
+	
+	private void unregisterReferenceToBlobKey(BlobKey key) {
+		Integer references = blobKeyReferenceCounters.get(key);
+		if (references != null) {
+			int newReferences = Math.max(references.intValue() - 1, 0);
+			blobKeyReferenceCounters.put(key, newReferences);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static class LibraryCacheEntry {
+		
+		private final ClassLoader classLoader;
+		
+		private final Set<ExecutionAttemptID> referenceHolders;
+		
+		private final Set<BlobKey> libraries;
+		
+		
+		public LibraryCacheEntry(Collection<BlobKey> libraries, ClassLoader classLoader,
ExecutionAttemptID initialReference) {
+			this.classLoader = classLoader;
+			this.libraries = new HashSet<BlobKey>(libraries);
+			this.referenceHolders = new HashSet<ExecutionAttemptID>();
+			this.referenceHolders.add(initialReference);
+		}
+		
+		
+		public ClassLoader getClassLoader() {
+			return classLoader;
+		}
+		
+		public Set<BlobKey> getLibraries() {
+			return libraries;
+		}
+		
+		public void register(ExecutionAttemptID task, Collection<BlobKey> keys) {
+			if (!libraries.containsAll(keys)) {
+				throw new IllegalStateException("The library registration references a different set
of libraries than previous registrations for this job.");
+			}
+			
+			this.referenceHolders.add(task);
+		}
+		
+		public boolean unregister(ExecutionAttemptID task) {
+			referenceHolders.remove(task);
+			return referenceHolders.isEmpty();
+		}
+		
+		public int getNumberOfReferenceHolders() {
+			return referenceHolders.size();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 5c162a0..532107f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ import java.io.IOException;
 import java.util.Collection;
 
 public class FallbackLibraryCacheManager implements LibraryCacheManager {
+	
 	private static Logger LOG = LoggerFactory.getLogger(FallbackLibraryCacheManager.class);
 
 	@Override
@@ -41,12 +43,22 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager
{
 	}
 
 	@Override
-	public void register(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException
{
+	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) {
+		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
+	}
+	
+	@Override
+	public void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey>
requiredJarFiles) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}
 
 	@Override
-	public void unregister(JobID id) {
+	public void unregisterJob(JobID id) {
+		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
+	}
+	
+	@Override
+	public void unregisterTask(JobID id, ExecutionAttemptID execution) {
 		LOG.warn("FallbackLibraryCacheManager does not book keeping of job IDs.");
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index a0a95ab..63d85b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 
 import java.io.File;
@@ -32,7 +33,7 @@ public interface LibraryCacheManager {
 	 * @param id identifying the job
 	 * @return ClassLoader which can load the user code
 	 */
-	ClassLoader getClassLoader(final JobID id);
+	ClassLoader getClassLoader(JobID id);
 
 	/**
 	 * Returns a file handle to the file identified by the blob key.
@@ -41,7 +42,7 @@ public interface LibraryCacheManager {
 	 * @return File handle
 	 * @throws IOException
 	 */
-	File getFile(final BlobKey blobKey) throws IOException;
+	File getFile(BlobKey blobKey) throws IOException;
 
 	/**
 	 * Registers a job with its required jar files. The jar files are identified by their blob
keys.
@@ -50,14 +51,30 @@ public interface LibraryCacheManager {
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @throws IOException
 	 */
-	void register(final JobID id, final Collection<BlobKey> requiredJarFiles) throws IOException;
+	void registerJob(JobID id, Collection<BlobKey> requiredJarFiles) throws IOException;
+	
+	/**
+	 * Registers a job task execution with its required jar files. The jar files are identified
by their blob keys.
+	 *
+	 * @param id job ID
+	 * @param requiredJarFiles collection of blob keys identifying the required jar files
+	 * @throws IOException
+	 */
+	void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles)
throws IOException;
 
 	/**
 	 * Unregisters a job from the library cache manager.
 	 *
 	 * @param id job ID
 	 */
-	void unregister(final JobID id);
+	void unregisterTask(JobID id, ExecutionAttemptID execution);
+	
+	/**
+	 * Unregisters a job from the library cache manager.
+	 *
+	 * @param id job ID
+	 */
+	void unregisterJob(JobID id);
 
 	/**
 	 * Shutdown method

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 4bce4a4..7fb4f94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -327,7 +327,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			}
 
 			// Register this job with the library cache manager
-			libraryCacheManager.register(job.getJobID(), job.getUserJarBlobKeys());
+			libraryCacheManager.registerJob(job.getJobID(), job.getUserJarBlobKeys());
 			
 			// get the existing execution graph (if we attach), or construct a new empty one to attach
 			executionGraph = this.currentJobs.get(job.getJobID());
@@ -425,7 +425,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			// job was not prperly removed by the fail call
 			if(currentJobs.contains(job.getJobID())){
 				currentJobs.remove(job.getJobID());
-				libraryCacheManager.unregister(job.getJobID());
+				libraryCacheManager.unregisterJob(job.getJobID());
 			}
 
 			return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
@@ -524,7 +524,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			this.currentJobs.remove(jid);
 			
 			try {
-				libraryCacheManager.unregister(jid);
+				libraryCacheManager.unregisterJob(jid);
 			}
 			catch (Throwable t) {
 				LOG.warn("Could not properly unregister job " + jid + " from the library cache.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 74ea220..1380d91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -574,7 +574,6 @@ public class TaskManager implements TaskOperationProtocol {
 		final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
 		
 		Task task = null;
-		boolean jarsRegistered = false;
 		
 		// check if the taskmanager is shut down or disconnected
 		if (shutdownStarted.get()) {
@@ -586,8 +585,7 @@ public class TaskManager implements TaskOperationProtocol {
 		
 		try {
 			// Now register data with the library manager
-			libraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
-			jarsRegistered = true;
+			libraryCacheManager.registerTask(jobID, executionId, tdd.getRequiredJarFiles());
 			
 			// library and classloader issues first
 			final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID);
@@ -659,9 +657,8 @@ public class TaskManager implements TaskOperationProtocol {
 				if (task != null) {
 					removeAllTaskResources(task);
 				}
-				if (jarsRegistered) {
-					libraryCacheManager.unregister(jobID);
-				}
+				
+				libraryCacheManager.unregisterTask(jobID, executionId);
 			}
 			catch (Throwable t2) {
 				LOG.error("Error during cleanup of task deployment", t2);
@@ -690,7 +687,7 @@ public class TaskManager implements TaskOperationProtocol {
 		removeAllTaskResources(task);
 
 		// Unregister task from library cache manager
-		libraryCacheManager.unregister(task.getJobID());
+		libraryCacheManager.unregisterTask(task.getJobID(), executionId);
 	}
 	
 	private void removeAllTaskResources(Task task) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6152c37/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index df32a81..67dd8a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -60,18 +60,18 @@ public class BlobLibraryCacheManagerTest {
 			keys.add(bc.put(buf));
 
 			libraryCacheManager = new BlobLibraryCacheManager(server, GlobalConfiguration.getConfiguration());
-			libraryCacheManager.register(jid, keys);
+			libraryCacheManager.registerJob(jid, keys);
 
 			List<File> files = new ArrayList<File>();
 
-			for(BlobKey key: keys){
+			for (BlobKey key: keys){
 				files.add(libraryCacheManager.getFile(key));
 			}
 
 			assertEquals(2, files.size());
 			files.clear();
 
-			libraryCacheManager.unregister(jid);
+			libraryCacheManager.unregisterJob(jid);
 
 			Thread.sleep(1500);
 


Mime
View raw message