flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7054] [blob] Remove LibraryCacheManager#getFile()
Date Tue, 08 Aug 2017 14:14:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8c7b3d9af -> 849990c00


[FLINK-7054] [blob] Remove LibraryCacheManager#getFile()

This was only used in tests where it is avoidable but if used anywhere else, it
may have caused cleanup issues.

This closes #4235.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/849990c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/849990c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/849990c0

Branch: refs/heads/master
Commit: 849990c00c1e7979b366088dbdba3f02a6123445
Parents: 8c7b3d9
Author: Nico Kruber <nico@data-artisans.com>
Authored: Wed Jun 21 14:45:31 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Aug 8 16:08:31 2017 +0200

----------------------------------------------------------------------
 .../librarycache/BlobLibraryCacheManager.java   | 36 +++++---------------
 .../FallbackLibraryCacheManager.java            |  9 +----
 .../librarycache/LibraryCacheManager.java       | 10 ------
 .../BlobLibraryCacheManagerTest.java            | 23 +++++++------
 .../BlobLibraryCacheRecoveryITCase.java         |  6 ++--
 5 files changed, 24 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 0c4cb85..0387725 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -18,7 +18,14 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
-import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
@@ -32,15 +39,6 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.blob.BlobService;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -51,8 +49,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>
  * All files registered via {@link #registerJob(JobID, Collection, Collection)} are reference-counted
  * and are removed by a timer-based cleanup task if their reference counter is zero.
- * <strong>NOTE:</strong> this does not apply to files that enter the blob service
via
- * {@link #getFile(BlobKey)}!
  */
 public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager
{
 
@@ -202,22 +198,6 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 		}
 	}
 
-	/**
-	 * Returns a file handle to the file identified by the blob key.
-	 * <p>
-	 * <strong>NOTE:</strong> if not already registered during
-	 * {@link #registerJob(JobID, Collection, Collection)}, files that enter the library cache
/
-	 * backing blob store using this method will not be reference-counted and garbage-collected!
-	 *
-	 * @param blobKey identifying the requested file
-	 * @return File handle
-	 * @throws IOException if any error occurs when retrieving the file
-	 */
-	@Override
-	public File getFile(BlobKey blobKey) throws IOException {
-		return new File(blobService.getURL(blobKey).getFile());
-	}
-
 	public int getBlobServerPort() {
 		return blobService.getPort();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 1ef6e31..8e14e58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 
@@ -39,11 +37,6 @@ public class FallbackLibraryCacheManager implements LibraryCacheManager
{
 	}
 
 	@Override
-	public File getFile(BlobKey blobKey) throws IOException {
-		throw new IOException("There is no file associated to the blob key " + blobKey);
-	}
-
-	@Override
 	public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL>
requiredClasspaths) {
 		LOG.warn("FallbackLibraryCacheManager cannot download files associated with blob keys.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index bf05271..5f9f443 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
@@ -37,15 +36,6 @@ public interface LibraryCacheManager {
 	ClassLoader getClassLoader(JobID id);
 
 	/**
-	 * Returns a file handle to the file identified by the blob key.
-	 *
-	 * @param blobKey identifying the requested file
-	 * @return File handle
-	 * @throws IOException if any error occurs when retrieving the file
-	 */
-	File getFile(BlobKey blobKey) throws IOException;
-
-	/**
 	 * Registers a job with its required jar files and classpaths. The jar files are identified
by their blob keys.
 	 *
 	 * @param id job ID

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 9d2bd55..606d8c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
@@ -82,7 +83,7 @@ public class BlobLibraryCacheManagerTest {
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
 
-			assertEquals(2, checkFilesExist(keys, libraryCacheManager, true));
+			assertEquals(2, checkFilesExist(keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -104,7 +105,7 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(keys, libraryCacheManager, false));
+			assertEquals(0, checkFilesExist(keys, server, false));
 
 			try {
 				server.getURL(keys.get(0));
@@ -144,21 +145,21 @@ public class BlobLibraryCacheManagerTest {
 	 *
 	 * @param keys
 	 * 		blob keys to check
-	 * @param libraryCacheManager
-	 * 		cache manager to use
+	 * @param blobService
+	 * 		BLOB store to use
 	 * @param doThrow
 	 * 		whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
 	 *
-	 * @return number of files we were able to retrieve via {@link BlobLibraryCacheManager#getFile(BlobKey)}
+	 * @return number of files we were able to retrieve via {@link BlobService#getURL(BlobKey)}
 	 */
-	private int checkFilesExist(
-			List<BlobKey> keys, BlobLibraryCacheManager libraryCacheManager, boolean doThrow)
+	private static int checkFilesExist(
+		List<BlobKey> keys, BlobService blobService, boolean doThrow)
 			throws IOException {
 		int numFiles = 0;
 
 		for (BlobKey key : keys) {
 			try {
-				libraryCacheManager.getFile(key);
+				blobService.getURL(key);
 				++numFiles;
 			} catch (IOException e) {
 				if (doThrow) {
@@ -204,13 +205,13 @@ public class BlobLibraryCacheManagerTest {
 			libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList());
 			libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList());
 
-			assertEquals(2, checkFilesExist(keys, libraryCacheManager, true));
+			assertEquals(2, checkFilesExist(keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			libraryCacheManager.unregisterTask(jid, executionId1);
 
-			assertEquals(2, checkFilesExist(keys, libraryCacheManager, true));
+			assertEquals(2, checkFilesExist(keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -232,7 +233,7 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(keys, libraryCacheManager, false));
+			assertEquals(0, checkFilesExist(keys, server, false));
 
 			bc.close();
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 02f121b..e5efd19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -107,7 +107,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
 
 			// Verify key 1
-			File f = libCache.getFile(keys.get(0));
+			File f = new File(cache.getURL(keys.get(0)).toURI());
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -126,7 +126,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Verify key 1
-			f = libCache.getFile(keys.get(0));
+			f = new File(cache.getURL(keys.get(0)).toURI());
 			assertEquals(expected.length, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {
@@ -138,7 +138,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			}
 
 			// Verify key 2
-			f = libCache.getFile(keys.get(1));
+			f = new File(cache.getURL(keys.get(1)).toURI());
 			assertEquals(256, f.length());
 
 			try (FileInputStream fis = new FileInputStream(f)) {


Mime
View raw message