flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/9] flink git commit: [FLINK-6008] Collection of BlobServer improvements
Date Tue, 04 Jul 2017 17:10:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 85853edab -> 4d18afed8


[FLINK-6008] Collection of BlobServer improvements

- [docs] update some config options to the new, non-deprecated ones
- [docs] improvements in the BlobService docs

- use Preconditions.checkArgument in BlobClient
- refactor BlobCache#getURL()
- do not fail the BlobServer if delete fails
- fix concurrent job directory creation
- do not guard a delete() call with a check for existence
- cleanup javadocs in (Blob)LibraryCacheManager
- cleanup tests for BlobLibraryCacheManager

This closes #4146.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c3354e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c3354e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c3354e

Branch: refs/heads/master
Commit: 28c3354e735e98657d8d90912234cf8e896e78d6
Parents: 85853ed
Author: Nico Kruber <nico@data-artisans.com>
Authored: Fri Jan 6 18:42:58 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Tue Jul 4 14:57:25 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   6 +-
 .../apache/flink/runtime/blob/BlobCache.java    |  82 +++------
 .../apache/flink/runtime/blob/BlobClient.java   |  18 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   6 +-
 .../runtime/blob/BlobServerConnection.java      |  18 +-
 .../apache/flink/runtime/blob/BlobService.java  |   7 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   4 +-
 .../librarycache/BlobLibraryCacheManager.java   |  22 +++
 .../librarycache/LibraryCacheManager.java       |  24 ++-
 .../runtime/blob/BlobServerDeleteTest.java      | 165 ++++++++++++++++---
 .../flink/runtime/blob/BlobServerPutTest.java   | 105 ++++++++++++
 .../BlobLibraryCacheManagerTest.java            | 145 ++++++++++++++--
 12 files changed, 464 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 1511f34..b4bf845 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -510,6 +510,8 @@ May be set to -1 to disable this feature.
   - `none` (default): No high availability. A single JobManager runs and no JobManager state
is checkpointed.
   - `zookeeper`: Supports the execution of multiple JobManagers and JobManager state checkpointing.
Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible
for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected
as the new leader and is given the last checkpointed JobManager state. In order to use the
'zookeeper' mode, it is mandatory to also define the `high-availability.zookeeper.quorum`
configuration value.
 
+- `high-availability.cluster-id`: (Default `/default_ns` in standalone cluster mode, or the
<yarn-application-id> under YARN) Defines the subdirectory under the root dir where
the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on
the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace` and
`high-availability.zookeeper.path.namespace`.
+
 Previously this key was named `recovery.mode` and the default value was `standalone`.
 
 #### ZooKeeper-based HA Mode
@@ -518,13 +520,11 @@ Previously this key was named `recovery.mode` and the default value
was `standal
 
 - `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under
which the ZooKeeper HA mode will create namespace directories. Previously this ket was named
`recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster
mode, or the <yarn-application-id> under YARN) Defines the subdirectory under the root
dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications
on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`.
-
 - `high-availability.zookeeper.path.latch`: (Default `/leaderlatch`) Defines the znode of
the leader latch which is used to elect the leader. Previously this key was named `recovery.zookeeper.path.latch`.
 
 - `high-availability.zookeeper.path.leader`: (Default `/leader`) Defines the znode of the
leader which contains the URL to the leader and the current leader session ID. Previously
this key was named `recovery.zookeeper.path.leader`.
 
-- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where
the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for
HA. Previously this key was named `recovery.zookeeper.storageDir`.
+- `high-availability.storageDir`: Defines the directory in the state backend where the JobManager
metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously
this key was named `recovery.zookeeper.storageDir` and `high-availability.zookeeper.storageDir`.
 
 - `high-availability.zookeeper.client.session-timeout`: (Default `60000`) Defines the session
timeout for the ZooKeeper session in ms. Previously this key was named `recovery.zookeeper.client.session-timeout`
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index aa47eae..32bd8fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,78 +141,45 @@ public final class BlobCache implements BlobService {
 
 		// fallback: download from the BlobServer
 		final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
+		LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
 
 		// loop over retries
 		int attempt = 0;
 		while (true) {
-
-			if (attempt == 0) {
-				LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
-			} else {
-				LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
-			}
-
-			try {
-				BlobClient bc = null;
-				InputStream is = null;
-				OutputStream os = null;
-
-				try {
-					bc = new BlobClient(serverAddress, blobClientConfig);
-					is = bc.get(requiredBlob);
-					os = new FileOutputStream(localJarFile);
-
-					while (true) {
-						final int read = is.read(buf);
-						if (read < 0) {
-							break;
-						}
-						os.write(buf, 0, read);
-					}
-
-					// we do explicitly not use a finally block, because we want the closing
-					// in the regular case to throw exceptions and cause the writing to fail.
-					// But, the closing on exception should not throw further exceptions and
-					// let us keep the root exception
-					os.close();
-					os = null;
-					is.close();
-					is = null;
-					bc.close();
-					bc = null;
-
-					// success, we finished
-					return localJarFile.toURI().toURL();
-				}
-				catch (Throwable t) {
-					// we use "catch (Throwable)" to keep the root exception. Otherwise that exception
-					// it would be replaced by any exception thrown in the finally block
-					IOUtils.closeQuietly(os);
-					IOUtils.closeQuietly(is);
-					IOUtils.closeQuietly(bc);
-
-					if (t instanceof IOException) {
-						throw (IOException) t;
-					} else {
-						throw new IOException(t.getMessage(), t);
+			try (
+				final BlobClient bc = new BlobClient(serverAddress, blobClientConfig);
+				final InputStream is = bc.get(requiredBlob);
+				final OutputStream os = new FileOutputStream(localJarFile)
+			) {
+				while (true) {
+					final int read = is.read(buf);
+					if (read < 0) {
+						break;
 					}
+					os.write(buf, 0, read);
 				}
+
+				// success, we finished
+				return localJarFile.toURI().toURL();
 			}
-			catch (IOException e) {
+			catch (Throwable t) {
 				String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
 					" and store it under " + localJarFile.getAbsolutePath();
 				if (attempt < numFetchRetries) {
-					attempt++;
 					if (LOG.isDebugEnabled()) {
-						LOG.debug(message + " Retrying...", e);
+						LOG.debug(message + " Retrying...", t);
 					} else {
 						LOG.error(message + " Retrying...");
 					}
 				}
 				else {
-					LOG.error(message + " No retries left.", e);
-					throw new IOException(message, e);
+					LOG.error(message + " No retries left.", t);
+					throw new IOException(message, t);
 				}
+
+				// retry
+				++attempt;
+				LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
 			}
 		} // end loop over retries
 	}
@@ -225,8 +191,8 @@ public final class BlobCache implements BlobService {
 	public void delete(BlobKey key) throws IOException{
 		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
 
-		if (localFile.exists() && !localFile.delete()) {
-			LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath());
+		if (!localFile.delete() && localFile.exists()) {
+			LOG.warn("Failed to delete locally cached BLOB {} at {}", key, localFile.getAbsolutePath());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index fab3c5c..0f02e75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -64,6 +64,7 @@ import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * The BLOB client can communicate with the BLOB server and either upload (PUT), download
(GET),
@@ -593,9 +594,7 @@ public final class BlobClient implements Closeable {
 	 *         the BLOB server or if the BLOB server cannot delete the file
 	 */
 	public void delete(BlobKey key) throws IOException {
-		if (key == null) {
-			throw new IllegalArgumentException("BLOB key must not be null");
-		}
+		checkArgument(key != null, "BLOB key must not be null.");
 
 		deleteInternal(null, null, key);
 	}
@@ -611,12 +610,9 @@ public final class BlobClient implements Closeable {
 	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
 	 */
 	public void delete(JobID jobId, String key) throws IOException {
-		if (jobId == null) {
-			throw new IllegalArgumentException("JobID must not be null");
-		}
-		if (key == null) {
-			throw new IllegalArgumentException("Key must not be null");
-		}
+		checkArgument(jobId != null, "Job id must not be null.");
+		checkArgument(key != null, "BLOB name must not be null.");
+		
 		if (key.length() > MAX_KEY_LENGTH) {
 			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
 		}
@@ -633,9 +629,7 @@ public final class BlobClient implements Closeable {
 	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
 	 */
 	public void deleteAll(JobID jobId) throws IOException {
-		if (jobId == null) {
-			throw new IllegalArgumentException("Argument jobID must not be null");
-		}
+		checkArgument(jobId != null, "Job id must not be null.");
 
 		deleteInternal(jobId, null, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 5ad4b6a..3b3a39b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -411,10 +411,8 @@ public class BlobServer extends Thread implements BlobService {
 		readWriteLock.writeLock().lock();
 
 		try {
-			if (localFile.exists()) {
-				if (!localFile.delete()) {
-					LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
-				}
+			if (!localFile.delete() && localFile.exists()) {
+				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
 			}
 
 			blobStore.delete(key);

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index a76dbd5..ad33c5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -509,21 +509,7 @@ class BlobServerConnection extends Thread {
 
 			if (type == CONTENT_ADDRESSABLE) {
 				BlobKey key = BlobKey.readFromInputStream(inputStream);
-				File blobFile = blobServer.getStorageLocation(key);
-
-				writeLock.lock();
-
-				try {
-					// we should make the local and remote file deletion atomic, otherwise we might risk
not
-					// removing the remote file in case of a concurrent put operation
-					if (blobFile.exists() && !blobFile.delete()) {
-						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
-					}
-
-					blobStore.delete(key);
-				} finally {
-					writeLock.unlock();
-				}
+				blobServer.delete(key);
 			}
 			else if (type == NAME_ADDRESSABLE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
@@ -540,7 +526,7 @@ class BlobServerConnection extends Thread {
 					// we should make the local and remote file deletion atomic, otherwise we might risk
not
 					// removing the remote file in case of a concurrent put operation
 					if (blobFile.exists() && !blobFile.delete()) {
-						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+						LOG.warn("Cannot delete local BLOB file " + blobFile.getAbsolutePath());
 					}
 
 					blobStore.delete(jobID, key);

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 97a2d51..c1447c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -28,17 +28,18 @@ import java.net.URL;
 public interface BlobService extends Closeable {
 
 	/**
-	 * This method returns the URL of the file associated with the provided blob key.
+	 * Returns the URL of the file associated with the provided blob key.
 	 *
 	 * @param key blob key associated with the requested file
 	 * @return The URL to the file.
-	 * @throws IOException
+	 * @throws java.io.FileNotFoundException when the path does not exist;
+	 * @throws IOException if any other error occurs when retrieving the file
 	 */
 	URL getURL(BlobKey key) throws IOException;
 
 
 	/**
-	 * This method deletes the file associated with the provided blob key.
+	 * Deletes the file associated with the provided blob key.
 	 *
 	 * @param key associated with the file to be deleted
 	 * @throws IOException

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 8da362d..9d538cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -227,7 +227,9 @@ public class BlobUtils {
 	private static File getJobDirectory(File storageDir, JobID jobID) {
 		final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
 
-		if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
+		// note: thread-safe create should try to mkdir first and then ignore the case that the
+		//       directory already existed
+		if (!jobDirectory.mkdirs() && !jobDirectory.exists()) {
 			throw new RuntimeException("Could not create jobId directory '" + jobDirectory.getAbsolutePath()
+ "'.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 0702a11..0c4cb85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -48,6 +48,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * a set of libraries (typically JAR files) which the job requires to run. The library cache
manager
  * caches library files in order to avoid unnecessary retransmission of data. It is based
on a singleton
  * programming pattern, so there exists at most one library manager at a time.
+ * <p>
+ * All files registered via {@link #registerJob(JobID, Collection, Collection)} are reference-counted
+ * and are removed by a timer-based cleanup task if their reference counter is zero.
+ * <strong>NOTE:</strong> this does not apply to files that enter the blob service
via
+ * {@link #getFile(BlobKey)}!
  */
 public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager
{
 
@@ -73,6 +78,12 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 	
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Creates the blob library cache manager.
+	 *
+	 * @param blobService blob file retrieval service to use
+	 * @param cleanupInterval cleanup interval in milliseconds
+	 */
 	public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) {
 		this.blobService = checkNotNull(blobService);
 
@@ -191,6 +202,17 @@ public final class BlobLibraryCacheManager extends TimerTask implements
LibraryC
 		}
 	}
 
+	/**
+	 * Returns a file handle to the file identified by the blob key.
+	 * <p>
+	 * <strong>NOTE:</strong> if not already registered during
+	 * {@link #registerJob(JobID, Collection, Collection)}, files that enter the library cache
/
+	 * backing blob store using this method will not be reference-counted and garbage-collected!
+	 *
+	 * @param blobKey identifying the requested file
+	 * @return File handle
+	 * @throws IOException if any error occurs when retrieving the file
+	 */
 	@Override
 	public File getFile(BlobKey blobKey) throws IOException {
 		return new File(blobService.getURL(blobKey).getFile());

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 03c8f27..bf05271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -41,7 +41,7 @@ public interface LibraryCacheManager {
 	 *
 	 * @param blobKey identifying the requested file
 	 * @return File handle
-	 * @throws IOException
+	 * @throws IOException if any error occurs when retrieving the file
 	 */
 	File getFile(BlobKey blobKey) throws IOException;
 
@@ -51,7 +51,9 @@ public interface LibraryCacheManager {
 	 * @param id job ID
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @param requiredClasspaths collection of classpaths that are added to the user code class
loader
-	 * @throws IOException
+	 * @throws IOException if any error occurs when retrieving the required jar files
+	 *
+	 * @see #unregisterJob(JobID) counterpart of this method
 	 */
 	void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, Collection<URL>
requiredClasspaths)
 			throws IOException;
@@ -63,21 +65,35 @@ public interface LibraryCacheManager {
 	 * @param requiredJarFiles collection of blob keys identifying the required jar files
 	 * @param requiredClasspaths collection of classpaths that are added to the user code class
loader
 	 * @throws IOException
+	 *
+	 * @see #unregisterTask(JobID, ExecutionAttemptID) counterpart of this method
 	 */
 	void registerTask(JobID id, ExecutionAttemptID execution, Collection<BlobKey> requiredJarFiles,
 			Collection<URL> requiredClasspaths) throws IOException;
 
 	/**
-	 * Unregisters a job from the library cache manager.
+	 * Unregisters a job task execution from the library cache manager.
+	 * <p>
+	 * <strong>Note:</strong> this is the counterpart of {@link #registerTask(JobID,
+	 * ExecutionAttemptID, Collection, Collection)} and it will not remove any job added via
+	 * {@link #registerJob(JobID, Collection, Collection)}!
 	 *
 	 * @param id job ID
+	 *
+	 * @see #registerTask(JobID, ExecutionAttemptID, Collection, Collection) counterpart of
this method
 	 */
 	void unregisterTask(JobID id, ExecutionAttemptID execution);
-	
+
 	/**
 	 * Unregisters a job from the library cache manager.
+	 * <p>
+	 * <strong>Note:</strong> this is the counterpart of {@link #registerJob(JobID,
Collection,
+	 * Collection)} and it will not remove any job task execution added via {@link
+	 * #registerTask(JobID, ExecutionAttemptID, Collection, Collection)}!
 	 *
 	 * @param id job ID
+	 *
+	 * @see #registerJob(JobID, Collection, Collection) counterpart of this method
 	 */
 	void unregisterJob(JobID id);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 5e1d86e..15e2c7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -53,7 +54,7 @@ public class BlobServerDeleteTest extends TestLogger {
 	private final Random rnd = new Random();
 
 	@Test
-	public void testDeleteSingle() {
+	public void testDeleteSingleByBlobKey() {
 		BlobServer server = null;
 		BlobClient client = null;
 		BlobStore blobStore = new VoidBlobStore();
@@ -72,7 +73,13 @@ public class BlobServerDeleteTest extends TestLogger {
 			BlobKey key = client.put(data);
 			assertNotNull(key);
 
-			// issue a DELETE request
+			// second item
+			data[0] ^= 1;
+			BlobKey key2 = client.put(data);
+			assertNotNull(key2);
+			assertNotEquals(key, key2);
+
+			// issue a DELETE request via the client
 			client.delete(key);
 			client.close();
 
@@ -92,6 +99,69 @@ public class BlobServerDeleteTest extends TestLogger {
 			catch (IllegalStateException e) {
 				// expected
 			}
+
+			// delete a file directly on the server
+			server.delete(key2);
+			try {
+				server.getURL(key2);
+				fail("BLOB should have been deleted");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			cleanup(server, client);
+		}
+	}
+
+	@Test
+	public void testDeleteSingleByName() {
+		BlobServer server = null;
+		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config, blobStore);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress, config);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			JobID jobID = new JobID();
+			String name1 = "random name";
+			String name2 = "any nyme";
+
+			client.put(jobID, name1, data);
+			client.put(jobID, name2, data);
+
+			// issue a DELETE request via the client
+			client.delete(jobID, name1);
+			client.close();
+
+			client = new BlobClient(serverAddress, config);
+			try {
+				client.get(jobID, name1);
+				fail("BLOB should have been deleted");
+			}
+			catch (IOException e) {
+				// expected
+			}
+
+			try {
+				client.put(new byte[1]);
+				fail("client should be closed after erroneous operation");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -125,9 +195,15 @@ public class BlobServerDeleteTest extends TestLogger {
 			// put content addressable (like libraries)
 			client.put(jobID, name1, data);
 			client.put(jobID, name2, new byte[712]);
+			// items for a second (different!) job ID
+			final byte[] jobIdBytes = jobID.getBytes();
+			jobIdBytes[0] ^= 1;
+			JobID jobID2 = JobID.fromByteArray(jobIdBytes);
+			client.put(jobID2, name1, data);
+			client.put(jobID2, name2, new byte[712]);
 
 
-			// issue a DELETE ALL request
+			// issue a DELETE ALL request via the client
 			client.deleteAll(jobID);
 			client.close();
 
@@ -189,13 +265,16 @@ public class BlobServerDeleteTest extends TestLogger {
 			File blobFile = server.getStorageLocation(key);
 			assertTrue(blobFile.delete());
 
-			// issue a DELETE request
+			// issue a DELETE request via the client
 			try {
 				client.delete(key);
 			}
 			catch (IOException e) {
 				fail("DELETE operation should not fail if file is already deleted");
 			}
+
+			// issue a DELETE request on the server
+			server.delete(key);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -230,7 +309,7 @@ public class BlobServerDeleteTest extends TestLogger {
 			File blobFile = server.getStorageLocation(jid, name);
 			assertTrue(blobFile.delete());
 
-			// issue a DELETE request
+			// issue a DELETE request via the client
 			try {
 				client.delete(jid, name);
 			}
@@ -248,13 +327,15 @@ public class BlobServerDeleteTest extends TestLogger {
 	}
 
 	@Test
-	public void testDeleteFails() {
+	public void testDeleteFailsByBlobKey() {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
 		BlobClient client = null;
 		BlobStore blobStore = new VoidBlobStore();
 
+		File blobFile = null;
+		File directory = null;
 		try {
 			Configuration config = new Configuration();
 			server = new BlobServer(config, blobStore);
@@ -269,30 +350,74 @@ public class BlobServerDeleteTest extends TestLogger {
 			BlobKey key = client.put(data);
 			assertNotNull(key);
 
-			File blobFile = server.getStorageLocation(key);
-			File directory = blobFile.getParentFile();
+			blobFile = server.getStorageLocation(key);
+			directory = blobFile.getParentFile();
 
 			assertTrue(blobFile.setWritable(false, false));
 			assertTrue(directory.setWritable(false, false));
 
-			// issue a DELETE request
-			try {
-				client.delete(key);
-				fail("DELETE operation should fail if file cannot be deleted");
-			}
-			catch (IOException e) {
-				// expected
-			}
-			finally {
+			// issue a DELETE request via the client
+			client.delete(key);
+
+			// issue a DELETE request on the server
+			server.delete(key);
+
+			// the file should still be there
+			server.getURL(key);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		} finally {
+			if (blobFile != null && directory != null) {
 				blobFile.setWritable(true, false);
 				directory.setWritable(true, false);
 			}
+			cleanup(server, client);
 		}
-		catch (Exception e) {
+	}
+
+	@Test
+	public void testDeleteByNameFails() {
+		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
+
+		BlobServer server = null;
+		BlobClient client = null;
+		BlobStore blobStore = new VoidBlobStore();
+
+		File blobFile = null;
+		File directory = null;
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config, blobStore);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress, config);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			JobID jid = new JobID();
+			String name = "------------fdghljEgRJHF+##4U789Q345";
+
+			client.put(jid, name, data);
+
+			blobFile = server.getStorageLocation(jid, name);
+			directory = blobFile.getParentFile();
+
+			assertTrue(blobFile.setWritable(false, false));
+			assertTrue(directory.setWritable(false, false));
+
+			// issue a DELETE request via the client
+			client.delete(jid, name);
+
+			// the file should still be there
+			client.get(jid, name);
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}
-		finally {
+		} finally {
+			blobFile.setWritable(true, false);
+			directory.setWritable(true, false);
 			cleanup(server, client);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 35ef968..31e63a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -59,6 +60,110 @@ public class BlobServerPutTest extends TestLogger {
 
 	private final Random rnd = new Random();
 
+
+	// --- concurrency tests for utility methods which could fail during the put operation ---
+
+	/**
+	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
+	 */
+	public static class ContentAddressableGetStorageLocation extends CheckedThread {
+		private final BlobServer server;
+		private final BlobKey key;
+
+		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
+			this.server = server;
+			this.key = key;
+		}
+
+		@Override
+		public void go() throws Exception {
+			server.getStorageLocation(key);
+		}
+	}
+
+	/**
+	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(BlobKey)}.
+	 */
+	@Test
+	public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception
{
+		BlobServer server = new BlobServer(new Configuration(), new VoidBlobStore());
+
+		try {
+			BlobKey key = new BlobKey();
+			CheckedThread[] threads = new CheckedThread[] {
+				new ContentAddressableGetStorageLocation(server, key),
+				new ContentAddressableGetStorageLocation(server, key),
+				new ContentAddressableGetStorageLocation(server, key)
+			};
+			checkedThreadSimpleTest(threads);
+		} finally {
+			server.close();
+		}
+	}
+
+	/**
+	 * Helper method to first start all threads and then wait for their completion.
+	 *
+	 * @param threads threads to use
+	 * @throws Exception exceptions that are thrown from the threads
+	 */
+	protected void checkedThreadSimpleTest(CheckedThread[] threads)
+		throws Exception {
+
+		// start all threads
+		for (CheckedThread t: threads) {
+			t.start();
+		}
+
+		// wait for thread completion and check exceptions
+		for (CheckedThread t: threads) {
+			t.sync();
+		}
+	}
+
+	/**
+	 * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, String)}
+	 */
+	public static class NameAddressableGetStorageLocation extends CheckedThread {
+		private final BlobServer server;
+		private final JobID jid;
+		private final String name;
+
+		public NameAddressableGetStorageLocation(BlobServer server, JobID jid, String name) {
+			this.server = server;
+			this.jid = jid;
+			this.name = name;
+		}
+
+		@Override
+		public void go() throws Exception {
+			server.getStorageLocation(jid, name);
+		}
+	}
+
+	/**
+	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, String)}.
+	 */
+	@Test
+	public void testServerNameAddressableGetStorageLocationConcurrent() throws Exception {
+		BlobServer server = new BlobServer(new Configuration(), new VoidBlobStore());
+
+		try {
+			JobID jid = new JobID();
+			String stringKey = "my test key";
+			CheckedThread[] threads = new CheckedThread[] {
+				new NameAddressableGetStorageLocation(server, jid, stringKey),
+				new NameAddressableGetStorageLocation(server, jid, stringKey),
+				new NameAddressableGetStorageLocation(server, jid, stringKey)
+			};
+			checkedThreadSimpleTest(threads);
+		} finally {
+			server.close();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	@Test
 	public void testPutBufferSuccessful() throws IOException {
 		BlobServer server = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/28c3354e/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 98e6b3e..73f5819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -43,8 +43,12 @@ import java.util.List;
 
 public class BlobLibraryCacheManagerTest {
 
+	/**
+	 * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link
+	 * BlobLibraryCacheManager#unregisterJob(JobID)}.
+	 */
 	@Test
-	public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException {
+	public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedException
{
 
 		JobID jid = new JobID();
 		List<BlobKey> keys = new ArrayList<BlobKey>();
@@ -67,14 +71,9 @@ public class BlobLibraryCacheManagerTest {
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
 
-			List<File> files = new ArrayList<File>();
-
-			for (BlobKey key : keys) {
-				files.add(libraryCacheManager.getFile(key));
-			}
-
-			assertEquals(2, files.size());
-			files.clear();
+			assertEquals(2, checkFilesExist(keys, libraryCacheManager, true));
+			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
+			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			libraryCacheManager.unregisterJob(jid);
 
@@ -86,25 +85,137 @@ public class BlobLibraryCacheManagerTest {
 					Thread.sleep(500);
 				}
 				while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
-						System.currentTimeMillis() < deadline);
+					System.currentTimeMillis() < deadline);
 			}
 
 			// this fails if we exited via a timeout
 			assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
+			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
-			int caughtExceptions = 0;
+			// the blob cache should no longer contain the files
+			assertEquals(0, checkFilesExist(keys, libraryCacheManager, false));
 
-			for (BlobKey key : keys) {
-				// the blob cache should no longer contain the files
+			try {
+				bc.get(jid, "test");
+				fail("name-addressable BLOB should have been deleted");
+			} catch (IOException e) {
+				// expected
+			}
+
+			bc.close();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (server != null) {
+				server.close();
+			}
+
+			if (libraryCacheManager != null) {
 				try {
-					files.add(libraryCacheManager.getFile(key));
+					libraryCacheManager.shutdown();
 				}
-				catch (IOException ioe) {
-					caughtExceptions++;
+				catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Checks how many of the files given by blob keys are accessible.
+	 *
+	 * @param keys
+	 * 		blob keys to check
+	 * @param libraryCacheManager
+	 * 		cache manager to use
+	 * @param doThrow
+	 * 		whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
+	 *
+	 * @return number of files we were able to retrieve via {@link BlobLibraryCacheManager#getFile(BlobKey)}
+	 */
+	private int checkFilesExist(
+			List<BlobKey> keys, BlobLibraryCacheManager libraryCacheManager, boolean doThrow)
+			throws IOException {
+		int numFiles = 0;
+
+		for (BlobKey key : keys) {
+			try {
+				libraryCacheManager.getFile(key);
+				++numFiles;
+			} catch (IOException e) {
+				if (doThrow) {
+					throw e;
+				}
+			}
+		}
+
+		return numFiles;
+	}
+
+	/**
+	 * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link
+	 * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}.
+	 */
+	@Test
+	public void testLibraryCacheManagerTaskCleanup() throws IOException, InterruptedException
{
+
+		JobID jid = new JobID();
+		ExecutionAttemptID executionId1 = new ExecutionAttemptID();
+		ExecutionAttemptID executionId2 = new ExecutionAttemptID();
+		List<BlobKey> keys = new ArrayList<BlobKey>();
+		BlobServer server = null;
+		BlobLibraryCacheManager libraryCacheManager = null;
+
+		final byte[] buf = new byte[128];
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config, new VoidBlobStore());
+			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
+			BlobClient bc = new BlobClient(blobSocketAddress, config);
+
+			keys.add(bc.put(buf));
+			buf[0] += 1;
+			keys.add(bc.put(buf));
+			bc.put(jid, "test", buf);
+
+			long cleanupInterval = 1000l;
+			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
+			libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList());
+			libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList());
+
+			assertEquals(2, checkFilesExist(keys, libraryCacheManager, true));
+			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
+			assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid));
+
+			libraryCacheManager.unregisterTask(jid, executionId1);
+
+			assertEquals(2, checkFilesExist(keys, libraryCacheManager, true));
+			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
+			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
+
+			libraryCacheManager.unregisterTask(jid, executionId2);
+
+			// because we cannot guarantee that there are not thread races in the build system, we
+			// loop for a certain while until the references disappear
+			{
+				long deadline = System.currentTimeMillis() + 30000;
+				do {
+					Thread.sleep(100);
 				}
+				while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
+						System.currentTimeMillis() < deadline);
 			}
 
-			assertEquals(2, caughtExceptions);
+			// this fails if we exited via a timeout
+			assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
+			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
+
+			// the blob cache should no longer contain the files
+			assertEquals(0, checkFilesExist(keys, libraryCacheManager, false));
 
 			bc.close();
 		} finally {


Mime
View raw message