flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-7056][blob] add API to allow job-related BLOBs to be stored
Date Mon, 14 Aug 2017 09:18:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index d06f76f..1216be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -40,69 +42,123 @@ import static org.junit.Assert.assertTrue;
 /**
  * This class contains unit tests for the {@link BlobCache}.
  */
-public class BlobCacheSuccessTest {
+public class BlobCacheSuccessTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
-	 * BlobCache with no HA. BLOBs need to be downloaded form a working
+	 * BlobCache with no HA, job-unrelated BLOBs. BLOBs need to be downloaded form a working
 	 * BlobServer.
 	 */
 	@Test
-	public void testBlobCache() throws IOException {
+	public void testBlobNoJobCache() throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 
-		uploadFileGetTest(config, false, false);
+		uploadFileGetTest(config, null, false, false);
+	}
+
+	/**
+	 * BlobCache with no HA, job-related BLOBS. BLOBs need to be downloaded form a working
+	 * BlobServer.
+	 */
+	@Test
+	public void testBlobForJobCache() throws IOException {
+		Configuration config = new Configuration();
+		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+			temporaryFolder.newFolder().getAbsolutePath());
+
+		uploadFileGetTest(config, new JobID(), false, false);
 	}
 
 	/**
 	 * BlobCache is configured in HA mode and the cache can download files from
 	 * the file system directly and does not need to download BLOBs from the
-	 * BlobServer.
+	 * BlobServer. Using job-unrelated BLOBs.
+	 */
+	@Test
+	public void testBlobNoJobCacheHa() throws IOException {
+		testBlobCacheHa(null);
+	}
+
+	/**
+	 * BlobCache is configured in HA mode and the cache can download files from
+	 * the file system directly and does not need to download BLOBs from the
+	 * BlobServer. Using job-related BLOBs.
 	 */
 	@Test
-	public void testBlobCacheHa() throws IOException {
+	public void testBlobForJobCacheHa() throws IOException {
+		testBlobCacheHa(new JobID());
+	}
+
+	private void testBlobCacheHa(final JobID jobId) throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, true, true);
+		uploadFileGetTest(config, jobId, true, true);
 	}
 
 	/**
 	 * BlobCache is configured in HA mode and the cache can download files from
 	 * the file system directly and does not need to download BLOBs from the
-	 * BlobServer.
+	 * BlobServer. Using job-unrelated BLOBs.
+	 */
+	@Test
+	public void testBlobNoJobCacheHa2() throws IOException {
+		testBlobCacheHa2(null);
+	}
+
+	/**
+	 * BlobCache is configured in HA mode and the cache can download files from
+	 * the file system directly and does not need to download BLOBs from the
+	 * BlobServer. Using job-related BLOBs.
 	 */
 	@Test
-	public void testBlobCacheHa2() throws IOException {
+	public void testBlobForJobCacheHa2() throws IOException {
+		testBlobCacheHa2(new JobID());
+	}
+
+	private void testBlobCacheHa2(JobID jobId) throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, false, true);
+		uploadFileGetTest(config, jobId, false, true);
 	}
 
 	/**
 	 * BlobCache is configured in HA mode but the cache itself cannot access the
-	 * file system and thus needs to download BLOBs from the BlobServer.
+	 * file system and thus needs to download BLOBs from the BlobServer. Using job-unrelated BLOBs.
 	 */
 	@Test
-	public void testBlobCacheHaFallback() throws IOException {
+	public void testBlobNoJobCacheHaFallback() throws IOException {
+		testBlobCacheHaFallback(null);
+	}
+
+	/**
+	 * BlobCache is configured in HA mode but the cache itself cannot access the
+	 * file system and thus needs to download BLOBs from the BlobServer. Using job-related BLOBs.
+	 */
+	@Test
+	public void testBlobForJobCacheHaFallback() throws IOException {
+		testBlobCacheHaFallback(new JobID());
+	}
+
+	private void testBlobCacheHaFallback(final JobID jobId) throws IOException {
 		Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.newFolder().getPath());
-		uploadFileGetTest(config, false, false);
+		uploadFileGetTest(config, jobId, false, false);
 	}
 
 	/**
@@ -119,7 +175,7 @@ public class BlobCacheSuccessTest {
 	 * 		whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with
 	 * 		HA mode)
 	 */
-	private void uploadFileGetTest(final Configuration config, boolean shutdownServerAfterUpload,
+	private void uploadFileGetTest(final Configuration config, JobID jobId, boolean shutdownServerAfterUpload,
 			boolean cacheHasAccessToFs) throws IOException {
 		Preconditions.checkArgument(!shutdownServerAfterUpload || cacheHasAccessToFs);
 
@@ -154,9 +210,9 @@ public class BlobCacheSuccessTest {
 
 				blobClient = new BlobClient(serverAddress, config);
 
-				blobKeys.add(blobClient.put(buf));
+				blobKeys.add(blobClient.put(jobId, buf));
 				buf[0] = 1; // Make sure the BLOB key changes
-				blobKeys.add(blobClient.put(buf));
+				blobKeys.add(blobClient.put(jobId, buf));
 			} finally {
 				if (blobClient != null) {
 					blobClient.close();
@@ -172,7 +228,11 @@ public class BlobCacheSuccessTest {
 			blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
 
 			for (BlobKey blobKey : blobKeys) {
-				blobCache.getFile(blobKey);
+				if (jobId == null) {
+					blobCache.getFile(blobKey);
+				} else {
+					blobCache.getFile(jobId, blobKey);
+				}
 			}
 
 			if (blobServer != null) {
@@ -184,7 +244,11 @@ public class BlobCacheSuccessTest {
 			final File[] files = new File[blobKeys.size()];
 
 			for(int i = 0; i < blobKeys.size(); i++){
-				files[i] = blobCache.getFile(blobKeys.get(i));
+				if (jobId == null) {
+					files[i] = blobCache.getFile(blobKeys.get(i));
+				} else {
+					files[i] = blobCache.getFile(jobId, blobKeys.get(i));
+				}
 			}
 
 			// Verify the result

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 2932f41..cfec4c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -38,6 +38,7 @@ import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.JobID;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -144,7 +145,7 @@ public class BlobClientTest {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while reading the input stream
 	 */
-	private static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
+	static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
 		byte[] receivedBuffer = new byte[buf.length];
 
 		int bytesReceived = 0;
@@ -220,13 +221,20 @@ public class BlobClientTest {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
 			client = new BlobClient(serverAddress, getBlobClientConfig());
 
+			JobID jobId = new JobID();
+
 			// Store the data
-			BlobKey receivedKey = client.put(testBuffer);
+			BlobKey receivedKey = client.put(null, testBuffer);
+			assertEquals(origKey, receivedKey);
+			// try again with a job-related BLOB:
+			receivedKey = client.put(jobId, testBuffer);
 			assertEquals(origKey, receivedKey);
 
 			// Retrieve the data
 			InputStream is = client.get(receivedKey);
 			validateGet(is, testBuffer);
+			is = client.get(jobId, receivedKey);
+			validateGet(is, testBuffer);
 
 			// Check reaction to invalid keys
 			try {
@@ -236,6 +244,15 @@ public class BlobClientTest {
 			catch (IOException fnfe) {
 				// expected
 			}
+			// new client needed (closed from failure above)
+			client = new BlobClient(serverAddress, getBlobClientConfig());
+			try {
+				client.get(jobId, new BlobKey());
+				fail("Expected IOException did not occur");
+			}
+			catch (IOException fnfe) {
+				// expected
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -276,10 +293,16 @@ public class BlobClientTest {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort());
 			client = new BlobClient(serverAddress, getBlobClientConfig());
 
+			JobID jobId = new JobID();
+
 			// Store the data
 			is = new FileInputStream(testFile);
 			BlobKey receivedKey = client.put(is);
 			assertEquals(origKey, receivedKey);
+			// try again with a job-related BLOB:
+			is = new FileInputStream(testFile);
+			receivedKey = client.put(jobId, is);
+			assertEquals(origKey, receivedKey);
 
 			is.close();
 			is = null;
@@ -287,6 +310,8 @@ public class BlobClientTest {
 			// Retrieve the data
 			is = client.get(receivedKey);
 			validateGet(is, testFile);
+			is = client.get(jobId, receivedKey);
+			validateGet(is, testFile);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -324,6 +349,13 @@ public class BlobClientTest {
 
 		InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
 
+		uploadJarFile(serverAddress, blobClientConfig, testFile);
+		uploadJarFile(serverAddress, blobClientConfig, testFile);
+	}
+
+	private static void uploadJarFile(
+		final InetSocketAddress serverAddress, final Configuration blobClientConfig,
+		final File testFile) throws IOException {
 		List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
 			Collections.singletonList(new Path(testFile.toURI())));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 3c7711d..81304f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -94,11 +94,17 @@ public class BlobRecoveryITCase extends TestLogger {
 
 			BlobKey[] keys = new BlobKey[2];
 
-			// Put data
-			keys[0] = client.put(expected); // Request 1
-			keys[1] = client.put(expected, 32, 256); // Request 2
+			// Put job-unrelated data
+			keys[0] = client.put(null, expected); // Request 1
+			keys[1] = client.put(null, expected, 32, 256); // Request 2
 
+			// Put job-related data, verify that the checksums match
 			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
+			BlobKey key;
+			key = client.put(jobId[0], expected); // Request 3
+			assertEquals(keys[0], key);
+			key = client.put(jobId[1], expected, 32, 256); // Request 4
+			assertEquals(keys[1], key);
 
 			// check that the storage directory exists
 			final Path blobServerPath = new Path(storagePath, "blob");
@@ -130,9 +136,31 @@ public class BlobRecoveryITCase extends TestLogger {
 				}
 			}
 
+			// Verify request 3
+			try (InputStream is = client.get(jobId[0], keys[0])) {
+				byte[] actual = new byte[expected.length];
+				BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+				for (int i = 0; i < expected.length; i++) {
+					assertEquals(expected[i], actual[i]);
+				}
+			}
+
+			// Verify request 4
+			try (InputStream is = client.get(jobId[1], keys[1])) {
+				byte[] actual = new byte[256];
+				BlobUtils.readFully(is, actual, 0, 256, null);
+
+				for (int i = 32, j = 0; i < 256; i++, j++) {
+					assertEquals(expected[i], actual[j]);
+				}
+			}
+
 			// Remove again
 			client.delete(keys[0]);
 			client.delete(keys[1]);
+			client.delete(jobId[0], keys[0]);
+			client.delete(jobId[1], keys[1]);
 
 			// Verify everything is clean
 			assertTrue("HA storage directory does not exist", fs.exists(new Path(storagePath)));

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index ce4574b..d91aae42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -39,6 +40,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -75,35 +77,45 @@ public class BlobServerDeleteTest extends TestLogger {
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
-			assertNotNull(key);
+			// put job-unrelated (like libraries)
+			BlobKey key1 = client.put(null, data);
+			assertNotNull(key1);
 
-			// second item
+			// second job-unrelated item
 			data[0] ^= 1;
-			BlobKey key2 = client.put(data);
+			BlobKey key2 = client.put(null, data);
 			assertNotNull(key2);
-			assertNotEquals(key, key2);
+			assertNotEquals(key1, key2);
+
+			// put job-related with same key1 as non-job-related
+			data[0] ^= 1; // back to the original data
+			final JobID jobId = new JobID();
+			BlobKey key1b = client.put(jobId, data);
+			assertNotNull(key1b);
+			assertEquals(key1, key1b);
 
 			// issue a DELETE request via the client
-			client.delete(key);
+			client.delete(key1);
 			client.close();
 
 			client = new BlobClient(serverAddress, config);
 			try {
-				client.get(key);
+				client.get(key1);
 				fail("BLOB should have been deleted");
 			}
 			catch (IOException e) {
 				// expected
 			}
 
+			ensureClientIsClosed(client);
+
+			client = new BlobClient(serverAddress, config);
 			try {
-				client.put(new byte[1]);
-				fail("client should be closed after erroneous operation");
+				client.get(jobId, key1);
 			}
-			catch (IllegalStateException e) {
+			catch (IOException e) {
 				// expected
+				fail("Deleting a job-unrelated BLOB should not affect a job-related BLOB with the same key");
 			}
 
 			// delete a file directly on the server
@@ -125,8 +137,29 @@ public class BlobServerDeleteTest extends TestLogger {
 		}
 	}
 
+	private static void ensureClientIsClosed(final BlobClient client) throws IOException {
+		try {
+			client.put(null, new byte[1]);
+			fail("client should be closed after erroneous operation");
+		}
+		catch (IllegalStateException e) {
+			// expected
+		} finally {
+			client.close();
+		}
+	}
+
+	@Test
+	public void testDeleteAlreadyDeletedNoJob() {
+		testDeleteAlreadyDeleted(null);
+	}
+
 	@Test
-	public void testDeleteAlreadyDeletedByBlobKey() {
+	public void testDeleteAlreadyDeletedForJob() {
+		testDeleteAlreadyDeleted(new JobID());
+	}
+
+	private void testDeleteAlreadyDeleted(final JobID jobId) {
 		BlobServer server = null;
 		BlobClient client = null;
 		BlobStore blobStore = new VoidBlobStore();
@@ -143,23 +176,27 @@ public class BlobServerDeleteTest extends TestLogger {
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			// put file
+			BlobKey key = client.put(jobId, data);
 			assertNotNull(key);
 
-			File blobFile = server.getStorageLocation(key);
+			File blobFile = server.getStorageLocation(jobId, key);
 			assertTrue(blobFile.delete());
 
 			// issue a DELETE request via the client
 			try {
-				client.delete(key);
+				deleteHelper(client, jobId, key);
 			}
 			catch (IOException e) {
 				fail("DELETE operation should not fail if file is already deleted");
 			}
 
 			// issue a DELETE request on the server
-			server.delete(key);
+			if (jobId == null) {
+				server.delete(key);
+			} else {
+				server.delete(jobId, key);
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -170,8 +207,25 @@ public class BlobServerDeleteTest extends TestLogger {
 		}
 	}
 
+	private static void deleteHelper(BlobClient client, JobID jobId, BlobKey key) throws IOException {
+		if (jobId == null) {
+			client.delete(key);
+		} else {
+			client.delete(jobId, key);
+		}
+	}
+
 	@Test
-	public void testDeleteByBlobKeyFails() {
+	public void testDeleteFailsNoJob() {
+		testDeleteFails(null);
+	}
+
+	@Test
+	public void testDeleteFailsForJob() {
+		testDeleteFails(new JobID());
+	}
+
+	private void testDeleteFails(final JobID jobId) {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -193,29 +247,39 @@ public class BlobServerDeleteTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			BlobKey key = client.put(jobId, data);
 			assertNotNull(key);
 
-			blobFile = server.getStorageLocation(key);
+			blobFile = server.getStorageLocation(jobId, key);
 			directory = blobFile.getParentFile();
 
 			assertTrue(blobFile.setWritable(false, false));
 			assertTrue(directory.setWritable(false, false));
 
 			// issue a DELETE request via the client
-			client.delete(key);
+			deleteHelper(client, jobId, key);
 
 			// issue a DELETE request on the server
-			server.delete(key);
+			if (jobId == null) {
+				server.delete(key);
+			} else {
+				server.delete(jobId, key);
+			}
 
 			// the file should still be there
-			server.getFile(key);
+			if (jobId == null) {
+				server.getFile(key);
+			} else {
+				server.getFile(jobId, key);
+			}
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		} finally {
 			if (blobFile != null && directory != null) {
+				//noinspection ResultOfMethodCallIgnored
 				blobFile.setWritable(true, false);
+				//noinspection ResultOfMethodCallIgnored
 				directory.setWritable(true, false);
 			}
 			cleanup(server, client);
@@ -233,10 +297,29 @@ public class BlobServerDeleteTest extends TestLogger {
 	 * broken.
 	 */
 	@Test
-	public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentDeleteOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentDeleteOperations(null);
+	}
+
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent delete operations don't interfere with each other.
+	 *
+	 * Note: The test checks that there cannot be two threads which have checked whether a given blob file exist
+	 * and then one of them fails deleting it. Without the introduced lock, this situation should rarely happen
+	 * and make this test fail. Thus, if this test should become "unstable", then the delete atomicity is most likely
+	 * broken.
+	 */
+	@Test
+	public void testConcurrentDeleteOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentDeleteOperations(new JobID());
+	}
+
+	private void testConcurrentDeleteOperations(final JobID jobId)
+		throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
-
 		final BlobStore blobStore = mock(BlobStore.class);
 
 		final int concurrentDeleteOperations = 3;
@@ -251,16 +334,16 @@ public class BlobServerDeleteTest extends TestLogger {
 			final BlobKey blobKey;
 
 			try (BlobClient client = blobServer.createClient()) {
-				blobKey = client.put(data);
+				blobKey = client.put(jobId, data);
 			}
 
-			assertTrue(blobServer.getStorageLocation(blobKey).exists());
+			assertTrue(blobServer.getStorageLocation(jobId, blobKey).exists());
 
 			for (int i = 0; i < concurrentDeleteOperations; i++) {
 				CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
 					() -> {
 						try (BlobClient blobClient = blobServer.createClient()) {
-							blobClient.delete(blobKey);
+							deleteHelper(blobClient, jobId, blobKey);
 						} catch (IOException e) {
 							throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
 						}
@@ -278,13 +361,13 @@ public class BlobServerDeleteTest extends TestLogger {
 			// in case of no lock, one of the delete operations should eventually fail
 			waitFuture.get();
 
-			assertFalse(blobServer.getStorageLocation(blobKey).exists());
+			assertFalse(blobServer.getStorageLocation(jobId, blobKey).exists());
 		} finally {
 			executor.shutdownNow();
 		}
 	}
 
-	private void cleanup(BlobServer server, BlobClient client) {
+	private static void cleanup(BlobServer server, BlobClient client) {
 		if (client != null) {
 			try {
 				client.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index bd27d70..5ad8d95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -69,7 +70,28 @@ public class BlobServerGetTest extends TestLogger {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	@Test
-	public void testGetFailsDuringLookup() throws IOException {
+	public void testGetFailsDuringLookup1() throws IOException {
+		testGetFailsDuringLookup(null, new JobID());
+	}
+
+	@Test
+	public void testGetFailsDuringLookup2() throws IOException {
+		testGetFailsDuringLookup(new JobID(), new JobID());
+	}
+
+	@Test
+	public void testGetFailsDuringLookup3() throws IOException {
+		testGetFailsDuringLookup(new JobID(), null);
+	}
+
+	/**
+	 * Checks the correct result if a GET operation fails during the lookup of the file.
+	 *
+	 * @param jobId1 first job ID or <tt>null</tt> if job-unrelated
+	 * @param jobId2 second job ID different to <tt>jobId1</tt>
+	 */
+	private void testGetFailsDuringLookup(final JobID jobId1, final JobID jobId2)
+		throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -86,20 +108,29 @@ public class BlobServerGetTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			BlobKey key = client.put(jobId1, data);
 			assertNotNull(key);
 
-			// delete all files to make sure that GET requests fail
-			File blobFile = server.getStorageLocation(key);
+			// delete file to make sure that GET requests fail
+			File blobFile = server.getStorageLocation(jobId1, key);
 			assertTrue(blobFile.delete());
 
 			// issue a GET request that fails
-			try {
-				client.get(key);
-				fail("This should not succeed.");
-			} catch (IOException e) {
-				// expected
-			}
+			client = verifyDeleted(client, jobId1, key, serverAddress, config);
+
+			BlobKey key2 = client.put(jobId2, data);
+			assertNotNull(key);
+			assertEquals(key, key2);
+			// request for jobId2 should succeed
+			getFileHelper(client, jobId2, key);
+			// request for jobId1 should still fail
+			client = verifyDeleted(client, jobId1, key, serverAddress, config);
+
+			// same checks as for jobId1 but for jobId2 should also work:
+			blobFile = server.getStorageLocation(jobId2, key);
+			assertTrue(blobFile.delete());
+			client = verifyDeleted(client, jobId2, key, serverAddress, config);
+
 		} finally {
 			if (client != null) {
 				client.close();
@@ -110,8 +141,51 @@ public class BlobServerGetTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Checks that the given blob does not exist anymore.
+	 *
+	 * @param client
+	 * 		BLOB client to use for connecting to the BLOB server
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key
+	 * 		key identifying the BLOB to request
+	 * @param serverAddress
+	 * 		BLOB server address
+	 * @param config
+	 * 		client config
+	 *
+	 * @return a new client (since the old one is being closed on failure)
+	 */
+	private static BlobClient verifyDeleted(
+			BlobClient client, JobID jobId, BlobKey key,
+			InetSocketAddress serverAddress, Configuration config) throws IOException {
+		try {
+			getFileHelper(client, jobId, key);
+			fail("This should not succeed.");
+		} catch (IOException e) {
+			// expected
+		}
+		// need a new client (old ony closed due to failure
+		return new BlobClient(serverAddress, config);
+	}
+
+	@Test
+	public void testGetFailsDuringStreamingNoJob() throws IOException {
+		testGetFailsDuringStreaming(null);
+	}
+
 	@Test
-	public void testGetFailsDuringStreaming() throws IOException {
+	public void testGetFailsDuringStreamingForJob() throws IOException {
+		testGetFailsDuringStreaming(new JobID());
+	}
+
+	/**
+	 * Checks the correct result if a GET operation fails during the file download.
+	 *
+	 * @param jobId job ID or <tt>null</tt> if job-unrelated
+	 */
+	private void testGetFailsDuringStreaming(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -128,11 +202,11 @@ public class BlobServerGetTest extends TestLogger {
 			rnd.nextBytes(data);
 
 			// put content addressable (like libraries)
-			BlobKey key = client.put(data);
+			BlobKey key = client.put(jobId, data);
 			assertNotNull(key);
 
 			// issue a GET request that succeeds
-			InputStream is = client.get(key);
+			InputStream is = getFileHelper(client, jobId, key);
 
 			byte[] receiveBuffer = new byte[data.length];
 			int firstChunkLen = 50000;
@@ -169,8 +243,22 @@ public class BlobServerGetTest extends TestLogger {
 	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
 	 */
 	@Test
-	public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentGetOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentGetOperations(null);
+	}
 
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent get operations don't concurrently access the BlobStore to download a blob.
+	 */
+	@Test
+	public void testConcurrentGetOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentGetOperations(new JobID());
+	}
+
+	private void testConcurrentGetOperations(final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
@@ -191,14 +279,14 @@ public class BlobServerGetTest extends TestLogger {
 			new Answer() {
 				@Override
 				public Object answer(InvocationOnMock invocation) throws Throwable {
-					File targetFile = (File) invocation.getArguments()[1];
+					File targetFile = (File) invocation.getArguments()[2];
 
 					FileUtils.copyInputStreamToFile(bais, targetFile);
 
 					return null;
 				}
 			}
-		).when(blobStore).get(any(BlobKey.class), any(File.class));
+		).when(blobStore).get(any(JobID.class), any(BlobKey.class), any(File.class));
 
 		final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
@@ -207,7 +295,7 @@ public class BlobServerGetTest extends TestLogger {
 				CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
 					() -> {
 						try (BlobClient blobClient = blobServer.createClient();
-							 InputStream inputStream = blobClient.get(blobKey)) {
+							InputStream inputStream = getFileHelper(blobClient, jobId, blobKey)) {
 							byte[] buffer = new byte[data.length];
 
 							IOUtils.readFully(inputStream, buffer);
@@ -241,9 +329,18 @@ public class BlobServerGetTest extends TestLogger {
 			}
 
 			// verify that we downloaded the requested blob exactly once from the BlobStore
-			verify(blobStore, times(1)).get(eq(blobKey), any(File.class));
+			verify(blobStore, times(1)).get(eq(jobId), eq(blobKey), any(File.class));
 		} finally {
 			executor.shutdownNow();
 		}
 	}
+
+	static InputStream getFileHelper(BlobClient blobClient, JobID jobId, BlobKey blobKey)
+		throws IOException {
+		if (jobId == null) {
+			return blobClient.get(blobKey);
+		} else {
+			return blobClient.get(jobId, blobKey);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c479167..f55adb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.runtime.blob.BlobServerGetTest.getFileHelper;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -71,28 +73,43 @@ public class BlobServerPutTest extends TestLogger {
 	// --- concurrency tests for utility methods which could fail during the put operation ---
 
 	/**
-	 * Checked thread that calls {@link BlobServer#getStorageLocation(BlobKey)}
+	 * Checked thread that calls {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
 	 */
 	public static class ContentAddressableGetStorageLocation extends CheckedThread {
 		private final BlobServer server;
+		private final JobID jobId;
 		private final BlobKey key;
 
-		public ContentAddressableGetStorageLocation(BlobServer server, BlobKey key) {
+		public ContentAddressableGetStorageLocation(BlobServer server, JobID jobId, BlobKey key) {
 			this.server = server;
+			this.jobId = jobId;
 			this.key = key;
 		}
 
 		@Override
 		public void go() throws Exception {
-			server.getStorageLocation(key);
+			server.getStorageLocation(jobId, key);
 		}
 	}
 
 	/**
-	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(BlobKey)}.
+	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
 	 */
 	@Test
-	public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception {
+	public void testServerContentAddressableGetStorageLocationConcurrentNoJob() throws Exception {
+		testServerContentAddressableGetStorageLocationConcurrent(null);
+	}
+
+	/**
+	 * Tests concurrent calls to {@link BlobServer#getStorageLocation(JobID, BlobKey)}.
+	 */
+	@Test
+	public void testServerContentAddressableGetStorageLocationConcurrentForJob() throws Exception {
+		testServerContentAddressableGetStorageLocationConcurrent(new JobID());
+	}
+
+	private void testServerContentAddressableGetStorageLocationConcurrent(final JobID jobId)
+		throws Exception {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
@@ -101,9 +118,9 @@ public class BlobServerPutTest extends TestLogger {
 		try {
 			BlobKey key = new BlobKey();
 			CheckedThread[] threads = new CheckedThread[] {
-				new ContentAddressableGetStorageLocation(server, key),
-				new ContentAddressableGetStorageLocation(server, key),
-				new ContentAddressableGetStorageLocation(server, key)
+				new ContentAddressableGetStorageLocation(server, jobId, key),
+				new ContentAddressableGetStorageLocation(server, jobId, key),
+				new ContentAddressableGetStorageLocation(server, jobId, key)
 			};
 			checkedThreadSimpleTest(threads);
 		} finally {
@@ -134,7 +151,27 @@ public class BlobServerPutTest extends TestLogger {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testPutBufferSuccessful() throws IOException {
+	public void testPutBufferSuccessfulGet1() throws IOException {
+		testPutBufferSuccessfulGet(null, null);
+	}
+
+	@Test
+	public void testPutBufferSuccessfulGet2() throws IOException {
+		testPutBufferSuccessfulGet(null, new JobID());
+	}
+
+	@Test
+	public void testPutBufferSuccessfulGet3() throws IOException {
+		testPutBufferSuccessfulGet(new JobID(), new JobID());
+	}
+
+	@Test
+	public void testPutBufferSuccessfulGet4() throws IOException {
+		testPutBufferSuccessfulGet(new JobID(), null);
+	}
+
+	private void testPutBufferSuccessfulGet(final JobID jobId1, final JobID jobId2)
+		throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -150,17 +187,66 @@ public class BlobServerPutTest extends TestLogger {
 			byte[] data = new byte[2000000];
 			rnd.nextBytes(data);
 
-			// put content addressable (like libraries)
-			BlobKey key1 = client.put(data);
-			assertNotNull(key1);
+			// put data for jobId1 and verify
+			BlobKey key1a = client.put(jobId1, data);
+			assertNotNull(key1a);
+
+			BlobKey key1b = client.put(jobId1, data, 10, 44);
+			assertNotNull(key1b);
+
+			testPutBufferSuccessfulGet(jobId1, key1a, key1b, data, serverAddress, config);
+
+			// now put data for jobId2 and verify that both are ok
+			BlobKey key2a = client.put(jobId2, data);
+			assertNotNull(key2a);
+			assertEquals(key1a, key2a);
+
+			BlobKey key2b = client.put(jobId2, data, 10, 44);
+			assertNotNull(key2b);
+			assertEquals(key1b, key2b);
+
+
+			testPutBufferSuccessfulGet(jobId1, key1a, key1b, data, serverAddress, config);
+			testPutBufferSuccessfulGet(jobId2, key2a, key2b, data, serverAddress, config);
+
+
+		} finally {
+			if (client != null) {
+				client.close();
+			}
+			if (server != null) {
+				server.close();
+			}
+		}
+	}
 
-			BlobKey key2 = client.put(data, 10, 44);
-			assertNotNull(key2);
+	/**
+	 * GET the data stored at the two keys and check that it is equal to <tt>data</tt>.
+	 *
+	 * @param jobId
+	 * 		job ID or <tt>null</tt> if job-unrelated
+	 * @param key1
+	 * 		first key
+	 * @param key2
+	 * 		second key
+	 * @param data
+	 * 		expected data
+	 * @param serverAddress
+	 * 		BlobServer address to connect to
+	 * @param config
+	 * 		client configuration
+	 */
+	private static void testPutBufferSuccessfulGet(
+			JobID jobId, BlobKey key1, BlobKey key2, byte[] data,
+			InetSocketAddress serverAddress, Configuration config) throws IOException {
 
-			// --- GET the data and check that it is equal ---
+		BlobClient client = new BlobClient(serverAddress, config);
+		InputStream is1 = null;
+		InputStream is2 = null;
 
+		try {
 			// one get request on the same client
-			InputStream is1 = client.get(key2);
+			is1 = getFileHelper(client, jobId, key2);
 			byte[] result1 = new byte[44];
 			BlobUtils.readFully(is1, result1, 0, result1.length, null);
 			is1.close();
@@ -173,24 +259,31 @@ public class BlobServerPutTest extends TestLogger {
 			client.close();
 			client = new BlobClient(serverAddress, config);
 
-			InputStream is2 = client.get(key1);
-			byte[] result2 = new byte[data.length];
-			BlobUtils.readFully(is2, result2, 0, result2.length, null);
+			is2 = getFileHelper(client, jobId, key1);
+			BlobClientTest.validateGet(is2, data);
 			is2.close();
-			assertArrayEquals(data, result2);
 		} finally {
-			if (client != null) {
-				client.close();
+			if (is1 != null) {
+				is1.close();
 			}
-			if (server != null) {
-				server.close();
+			if (is2 != null) {
+				is1.close();
 			}
+			client.close();
 		}
 	}
 
+	@Test
+	public void testPutStreamSuccessfulNoJob() throws IOException {
+		testPutStreamSuccessful(null);
+	}
 
 	@Test
-	public void testPutStreamSuccessful() throws IOException {
+	public void testPutStreamSuccessfulForJob() throws IOException {
+		testPutStreamSuccessful(new JobID());
+	}
+
+	private void testPutStreamSuccessful(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -208,7 +301,12 @@ public class BlobServerPutTest extends TestLogger {
 
 			// put content addressable (like libraries)
 			{
-				BlobKey key1 = client.put(new ByteArrayInputStream(data));
+				BlobKey key1;
+				if (jobId == null) {
+					key1 = client.put(new ByteArrayInputStream(data));
+				} else {
+					key1 = client.put(jobId, new ByteArrayInputStream(data));
+				}
 				assertNotNull(key1);
 			}
 		} finally {
@@ -226,7 +324,16 @@ public class BlobServerPutTest extends TestLogger {
 	}
 
 	@Test
-	public void testPutChunkedStreamSuccessful() throws IOException {
+	public void testPutChunkedStreamSuccessfulNoJob() throws IOException {
+		testPutChunkedStreamSuccessful(null);
+	}
+
+	@Test
+	public void testPutChunkedStreamSuccessfulForJob() throws IOException {
+		testPutChunkedStreamSuccessful(new JobID());
+	}
+
+	private void testPutChunkedStreamSuccessful(final JobID jobId) throws IOException {
 		BlobServer server = null;
 		BlobClient client = null;
 
@@ -244,7 +351,12 @@ public class BlobServerPutTest extends TestLogger {
 
 			// put content addressable (like libraries)
 			{
-				BlobKey key1 = client.put(new ChunkedInputStream(data, 19));
+				BlobKey key1;
+				if (jobId == null) {
+					key1 = client.put(new ChunkedInputStream(data, 19));
+				} else {
+					key1 = client.put(jobId, new ChunkedInputStream(data, 19));
+				}
 				assertNotNull(key1);
 			}
 		} finally {
@@ -258,7 +370,16 @@ public class BlobServerPutTest extends TestLogger {
 	}
 
 	@Test
-	public void testPutBufferFails() throws IOException {
+	public void testPutBufferFailsNoJob() throws IOException {
+		testPutBufferFails(null);
+	}
+
+	@Test
+	public void testPutBufferFailsForJob() throws IOException {
+		testPutBufferFails(new JobID());
+	}
+
+	private void testPutBufferFails(final JobID jobId) throws IOException {
 		assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
 
 		BlobServer server = null;
@@ -285,7 +406,7 @@ public class BlobServerPutTest extends TestLogger {
 
 			// put content addressable (like libraries)
 			try {
-				client.put(data);
+				client.put(jobId, data);
 				fail("This should fail.");
 			}
 			catch (IOException e) {
@@ -293,7 +414,7 @@ public class BlobServerPutTest extends TestLogger {
 			}
 
 			try {
-				client.put(data);
+				client.put(jobId, data);
 				fail("Client should be closed");
 			}
 			catch (IllegalStateException e) {
@@ -320,7 +441,22 @@ public class BlobServerPutTest extends TestLogger {
 	 * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
 	 */
 	@Test
-	public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException {
+	public void testConcurrentPutOperationsNoJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentPutOperations(null);
+	}
+
+	/**
+	 * FLINK-6020
+	 *
+	 * Tests that concurrent put operations will only upload the file once to the {@link BlobStore}.
+	 */
+	@Test
+	public void testConcurrentPutOperationsForJob() throws IOException, ExecutionException, InterruptedException {
+		testConcurrentPutOperations(new JobID());
+	}
+
+	private void testConcurrentPutOperations(final JobID jobId)
+			throws IOException, InterruptedException, ExecutionException {
 		final Configuration config = new Configuration();
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
 
@@ -331,7 +467,7 @@ public class BlobServerPutTest extends TestLogger {
 		final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
 		final byte[] data = new byte[dataSize];
 
-		ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+		ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList<>(concurrentPutOperations);
 
 		ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
 
@@ -342,7 +478,13 @@ public class BlobServerPutTest extends TestLogger {
 				CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync(
 					() -> {
 						try (BlobClient blobClient = blobServer.createClient()) {
-							return blobClient.put(new BlockingInputStream(countDownLatch, data));
+							if (jobId == null) {
+								return blobClient
+									.put(new BlockingInputStream(countDownLatch, data));
+							} else {
+								return blobClient
+									.put(jobId, new BlockingInputStream(countDownLatch, data));
+							}
 						} catch (IOException e) {
 							throw new FlinkFutureException("Could not upload blob.", e);
 						}
@@ -369,7 +511,7 @@ public class BlobServerPutTest extends TestLogger {
 			}
 
 			// check that we only uploaded the file once to the blob store
-			verify(blobStore, times(1)).put(any(File.class), eq(blobKey));
+			verify(blobStore, times(1)).put(any(File.class), eq(jobId), eq(blobKey));
 		} finally {
 			executor.shutdownNow();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 2987c39..e449aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-import static org.mockito.Mockito.mock;
-
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -32,6 +30,10 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
+
 public class BlobUtilsTest {
 
 	private final static String CANNOT_CREATE_THIS = "cannot-create-this";
@@ -62,12 +64,18 @@ public class BlobUtilsTest {
 	public void testExceptionOnCreateStorageDirectoryFailure() throws
 		IOException {
 		// Should throw an Exception
-		BlobUtils.initStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
+		BlobUtils.initLocalStorageDirectory(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS).getAbsolutePath());
+	}
+
+	@Test(expected = Exception.class)
+	public void testExceptionOnCreateCacheDirectoryFailureNoJob() {
+		// Should throw an Exception
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), null, mock(BlobKey.class));
 	}
 
 	@Test(expected = Exception.class)
-	public void testExceptionOnCreateCacheDirectoryFailure() {
+	public void testExceptionOnCreateCacheDirectoryFailureForJob() {
 		// Should throw an Exception
-		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), mock(BlobKey.class));
+		BlobUtils.getStorageLocation(new File(blobUtilsTestDirectory, CANNOT_CREATE_THIS), new JobID(), mock(BlobKey.class));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 476fdcb..b43a307 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -73,17 +73,20 @@ public class BlobLibraryCacheManagerTest {
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress, config);
 
-			keys.add(bc.put(buf));
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+			JobID jobId = null;
+
+			keys.add(bc.put(jobId, buf));
 			buf[0] += 1;
-			keys.add(bc.put(buf));
+			keys.add(bc.put(jobId, buf));
 
 			bc.close();
 
-			long cleanupInterval = 1000l;
+			long cleanupInterval = 1000L;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerJob(jid, keys, Collections.<URL>emptyList());
 
-			assertEquals(2, checkFilesExist(keys, server, true));
+			assertEquals(2, checkFilesExist(jobId, keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -105,17 +108,25 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(keys, server, false));
+			assertEquals(0, checkFilesExist(jobId, keys, server, false));
 
 			try {
-				server.getFile(keys.get(0));
-				fail("name-addressable BLOB should have been deleted");
+				if (jobId == null) {
+					server.getFile(keys.get(0));
+				} else {
+					server.getFile(jobId, keys.get(0));
+				}
+				fail("BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
 			}
 			try {
-				server.getFile(keys.get(1));
-				fail("name-addressable BLOB should have been deleted");
+				if (jobId == null) {
+					server.getFile(keys.get(1));
+				} else {
+					server.getFile(jobId, keys.get(1));
+				}
+				fail("BLOB should have been deleted");
 			} catch (IOException e) {
 				// expected
 			}
@@ -150,16 +161,20 @@ public class BlobLibraryCacheManagerTest {
 	 * @param doThrow
 	 * 		whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
 	 *
-	 * @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)}
+	 * @return number of files we were able to retrieve via {@link BlobService#getFile}
 	 */
 	private static int checkFilesExist(
-		List<BlobKey> keys, BlobService blobService, boolean doThrow)
+			JobID jobId, List<BlobKey> keys, BlobService blobService, boolean doThrow)
 			throws IOException {
 		int numFiles = 0;
 
 		for (BlobKey key : keys) {
 			try {
-				blobService.getFile(key);
+				if (jobId == null) {
+					blobService.getFile(key);
+				} else {
+					blobService.getFile(jobId, key);
+				}
 				++numFiles;
 			} catch (IOException e) {
 				if (doThrow) {
@@ -196,22 +211,26 @@ public class BlobLibraryCacheManagerTest {
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress, config);
 
-			keys.add(bc.put(buf));
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+//			JobID jobId = new JobID();
+			JobID jobId = null;
+
+			keys.add(bc.put(jobId, buf));
 			buf[0] += 1;
-			keys.add(bc.put(buf));
+			keys.add(bc.put(jobId, buf));
 
-			long cleanupInterval = 1000l;
+			long cleanupInterval = 1000L;
 			libraryCacheManager = new BlobLibraryCacheManager(server, cleanupInterval);
 			libraryCacheManager.registerTask(jid, executionId1, keys, Collections.<URL>emptyList());
 			libraryCacheManager.registerTask(jid, executionId2, keys, Collections.<URL>emptyList());
 
-			assertEquals(2, checkFilesExist(keys, server, true));
+			assertEquals(2, checkFilesExist(jobId, keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(2, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			libraryCacheManager.unregisterTask(jid, executionId1);
 
-			assertEquals(2, checkFilesExist(keys, server, true));
+			assertEquals(2, checkFilesExist(jobId, keys, server, true));
 			assertEquals(2, libraryCacheManager.getNumberOfCachedLibraries());
 			assertEquals(1, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -233,7 +252,7 @@ public class BlobLibraryCacheManagerTest {
 			assertEquals(0, libraryCacheManager.getNumberOfReferenceHolders(jid));
 
 			// the blob cache should no longer contain the files
-			assertEquals(0, checkFilesExist(keys, server, false));
+			assertEquals(0, checkFilesExist(jobId, keys, server, false));
 
 			bc.close();
 		} finally {
@@ -269,10 +288,13 @@ public class BlobLibraryCacheManagerTest {
 			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
 			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+			JobID jobId = null;
+
 			// upload some meaningless data to the server
 			BlobClient uploader = new BlobClient(serverAddress, config);
-			BlobKey dataKey1 = uploader.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
-			BlobKey dataKey2 = uploader.put(new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
+			BlobKey dataKey1 = uploader.put(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7, 8});
+			BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 12, 13, 14, 15, 16, 17, 18});
 			uploader.close();
 
 			BlobLibraryCacheManager libCache = new BlobLibraryCacheManager(cache, 1000000000L);
@@ -316,11 +338,12 @@ public class BlobLibraryCacheManagerTest {
 					fail("Should fail with an IllegalStateException");
 				}
 				catch (IllegalStateException e) {
-					// that#s what we want
+					// that's what we want
 				}
 			}
 
-			cacheDir = new File(cache.getStorageDir(), "cache");
+			// see BlobUtils for the directory layout
+			cacheDir = new File(cache.getStorageDir(), "no_job");
 			assertTrue(cacheDir.exists());
 
 			// make sure no further blobs can be downloaded by removing the write

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index b19835b..2f6738d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -91,10 +91,14 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
 			List<BlobKey> keys = new ArrayList<>(2);
 
+			JobID jobId = new JobID();
+			// TODO: replace+adapt by jobId after adapting the BlobLibraryCacheManager
+			JobID blobJobId = null;
+
 			// Upload some data (libraries)
 			try (BlobClient client = new BlobClient(serverAddress[0], config)) {
-				keys.add(client.put(expected)); // Request 1
-				keys.add(client.put(expected, 32, 256)); // Request 2
+				keys.add(client.put(blobJobId, expected)); // Request 1
+				keys.add(client.put(blobJobId, expected, 32, 256)); // Request 2
 			}
 
 			// The cache
@@ -102,7 +106,6 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
 
 			// Register uploaded libraries
-			JobID jobId = new JobID();
 			ExecutionAttemptID executionId = new ExecutionAttemptID();
 			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a19c456/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 79b9c1c..3c75971 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -136,9 +137,11 @@ public class JobSubmitTest {
 			// upload two dummy bytes and add their keys to the job graph as dependencies
 			BlobKey key1, key2;
 			BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);
+			// TODO: make use of job-related BLOBs after adapting the BlobLibraryCacheManager
+			JobID jobId = null;
 			try {
-				key1 = bc.put(new byte[10]);
-				key2 = bc.put(new byte[10]);
+				key1 = bc.put(jobId, new byte[10]);
+				key2 = bc.put(jobId, new byte[10]);
 
 				// delete one of the blobs to make sure that the startup failed
 				bc.delete(key2);


Mime
View raw message