flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [4/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices
Date Wed, 17 May 2017 06:18:02 GMT
[FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

The HighAvailabilityService creates a single BlobStoreService instance which is
shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle
is exclusively managed by the HighAvailabilityServices. This means that the
BlobStore's content is only cleaned up if the HighAvailabilityService's HA data
is cleaned up. Having this single point of control, makes it easier to decide when
to discard HA data (e.g. in case of a successful job execution) and when to retain
the data (e.g. for recovery).

Close and cleanup all data of BlobStore in HighAvailabilityServices

Use HighAvailabilityServices to create BlobStore

Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods

This closes #3864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3ea89a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3ea89a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3ea89a9

Branch: refs/heads/release-1.3
Commit: e3ea89a9fab39e7595c466bcd90c30c338c86f4e
Parents: 827d74e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue May 9 10:26:37 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed May 17 08:16:59 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    | 14 ++-
 .../clusterframework/MesosTaskManager.scala     |  6 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 26 +++++-
 .../handlers/TaskManagerLogHandler.java         | 11 ++-
 .../webmonitor/WebRuntimeMonitorITCase.java     | 14 ++-
 .../handlers/TaskManagerLogHandlerTest.java     | 11 ++-
 .../apache/flink/runtime/blob/BlobCache.java    | 80 ++++-------------
 .../apache/flink/runtime/blob/BlobServer.java   | 38 ++++----
 .../apache/flink/runtime/blob/BlobService.java  |  8 +-
 .../apache/flink/runtime/blob/BlobStore.java    | 26 +-----
 .../flink/runtime/blob/BlobStoreService.java    | 32 +++++++
 .../apache/flink/runtime/blob/BlobUtils.java    | 44 +++++++--
 .../org/apache/flink/runtime/blob/BlobView.java | 49 +++++++++++
 .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++-
 .../flink/runtime/blob/VoidBlobStore.java       |  8 +-
 .../apache/flink/runtime/client/JobClient.java  | 13 ++-
 .../runtime/client/JobListeningContext.java     |  6 +-
 .../clusterframework/BootstrapTools.java        |  8 +-
 .../librarycache/BlobLibraryCacheManager.java   |  2 +-
 .../HighAvailabilityServicesUtils.java          | 12 ++-
 .../nonha/AbstractNonHaServices.java            |  5 +-
 .../zookeeper/ZooKeeperHaServices.java          | 93 ++++++++++----------
 .../runtime/jobmaster/JobManagerServices.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java     | 21 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 21 ++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |  8 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 36 +++++---
 .../runtime/blob/BlobCacheRetriesTest.java      | 79 ++++++++---------
 .../runtime/blob/BlobCacheSuccessTest.java      | 56 ++++++------
 .../flink/runtime/blob/BlobClientSslTest.java   | 52 +++++------
 .../flink/runtime/blob/BlobClientTest.java      | 16 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 21 +++--
 .../runtime/blob/BlobServerDeleteTest.java      | 21 +++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 41 +++------
 .../flink/runtime/blob/BlobServerPutTest.java   | 89 +++++--------------
 .../flink/runtime/blob/BlobServerRangeTest.java | 10 +--
 .../runtime/blob/TestingFailingBlobServer.java  |  4 +-
 .../BlobLibraryCacheManagerTest.java            | 33 +++----
 .../BlobLibraryCacheRecoveryITCase.java         | 21 +++--
 .../zookeeper/ZooKeeperRegistryTest.java        |  7 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  9 +-
 .../JobManagerLeaderElectionTest.java           |  3 +-
 .../ZooKeeperLeaderRetrievalTest.java           | 13 ++-
 .../runtime/metrics/TaskManagerMetricsTest.java |  2 +-
 ...askManagerComponentsStartupShutdownTest.java |  5 +-
 .../TaskManagerRegistrationTest.java            |  5 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala |  3 +-
 .../testingUtils/TestingTaskManager.scala       | 40 ++++-----
 ...agerHAProcessFailureBatchRecoveryITCase.java |  1 +
 .../flink/yarn/TestingYarnTaskManager.scala     | 25 +++---
 .../YarnHighAvailabilityServices.java           | 30 ++++++-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  6 +-
 55 files changed, 667 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 8a3f662..0815863 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.runtime.blob.BlobRecoveryITCase;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
@@ -234,7 +236,17 @@ public class HDFSTest {
 		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
-		BlobRecoveryITCase.testBlobServerRecovery(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	// package visible

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index e8d6a58..7834639 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -38,7 +38,7 @@ class MesosTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
@@ -48,7 +48,7 @@ class MesosTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry) {
 
   override def handleMessage: Receive = {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f83fa27..5c66545 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -147,6 +148,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	public WebRuntimeMonitor(
 			Configuration config,
 			LeaderRetrievalService leaderRetrievalService,
+			BlobView blobView,
 			ActorSystem actorSystem) throws IOException, InterruptedException {
 
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
@@ -279,10 +281,26 @@ public class WebRuntimeMonitor implements WebMonitor {
 		GET(router, new JobMetricsHandler(metricFetcher));
 
 		GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
-				TaskManagerLogHandler.FileMode.LOG, config, enableSSL));
-		GET(router, new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout,
-				TaskManagerLogHandler.FileMode.STDOUT, config, enableSSL));
+		GET(router,
+			new TaskManagerLogHandler(
+				retriever,
+				context,
+				jobManagerAddressPromise.future(),
+				timeout,
+				TaskManagerLogHandler.FileMode.LOG,
+				config,
+				enableSSL,
+				blobView));
+		GET(router,
+			new TaskManagerLogHandler(
+				retriever,
+				context,
+				jobManagerAddressPromise.future(),
+				timeout,
+				TaskManagerLogHandler.FileMode.STDOUT,
+				config,
+				enableSSL,
+				blobView));
 		GET(router, new TaskManagerMetricsHandler(metricFetcher));
 
 		router

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 37ee814..53ee336 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -50,6 +50,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +118,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 
 	private final Time timeTimeout;
 
+	private final BlobView blobView;
+
 	public enum FileMode {
 		LOG,
 		STDOUT
@@ -128,7 +132,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 		FiniteDuration timeout,
 		FileMode fileMode,
 		Configuration config,
-		boolean httpsEnabled) {
+		boolean httpsEnabled,
+		BlobView blobView) {
 		super(retriever, localJobManagerAddressPromise, timeout, httpsEnabled);
 
 		this.executor = checkNotNull(executor);
@@ -142,6 +147,8 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 				break;
 		}
 
+		this.blobView = Preconditions.checkNotNull(blobView, "blobView");
+
 		timeTimeout = Time.milliseconds(timeout.toMillis());
 	}
 
@@ -167,7 +174,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 					Option<String> hostOption = jobManager.actor().path().address().host();
 					String host = hostOption.isDefined() ? hostOption.get() : "localhost";
 					int port = (int) result;
-					return new BlobCache(new InetSocketAddress(host, port), config);
+					return new BlobCache(new InetSocketAddress(host, port), config, blobView);
 				}
 			}, executor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 5ccfe90..b418141 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -153,6 +154,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				webMonitor[i] = new WebRuntimeMonitor(
 					config,
 					highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+					highAvailabilityServices.createBlobStore(),
 					jobManagerSystem[i]);
 			}
 
@@ -293,9 +295,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();
 
-			LeaderRetrievalService leaderRetrievalService = mock(LeaderRetrievalService.class);
 			webRuntimeMonitor = new WebRuntimeMonitor(
-					config, leaderRetrievalService, actorSystem);
+				config,
+				mock(LeaderRetrievalService.class),
+				mock(BlobView.class),
+				actorSystem);
 
 			webRuntimeMonitor.start("akka://schmakka");
 
@@ -466,10 +470,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
+		HighAvailabilityServices highAvailabilityServices = flink.highAvailabilityServices();
+
 		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
 			config,
-			flink.highAvailabilityServices().getJobManagerLeaderRetriever(
-				HighAvailabilityServices.DEFAULT_JOB_ID),
+			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+			highAvailabilityServices.createBlobStore(),
 			jmActorSystem);
 
 		webMonitor.start(jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 4177f44..3d8f1a3 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -53,7 +54,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Matchers.any;
@@ -71,7 +71,8 @@ public class TaskManagerLogHandlerTest {
 			AkkaUtils.getDefaultClientTimeout(),
 			TaskManagerLogHandler.FileMode.LOG,
 			new Configuration(),
-			false);
+			false,
+			new VoidBlobStore());
 		String[] pathsLog = handlerLog.getPaths();
 		Assert.assertEquals(1, pathsLog.length);
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/log", pathsLog[0]);
@@ -83,7 +84,8 @@ public class TaskManagerLogHandlerTest {
 			AkkaUtils.getDefaultClientTimeout(),
 			TaskManagerLogHandler.FileMode.STDOUT,
 			new Configuration(),
-			false);
+			false,
+			new VoidBlobStore());
 		String[] pathsOut = handlerOut.getPaths();
 		Assert.assertEquals(1, pathsOut.length);
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", pathsOut[0]);
@@ -131,7 +133,8 @@ public class TaskManagerLogHandlerTest {
 			AkkaUtils.getDefaultClientTimeout(),
 			TaskManagerLogHandler.FileMode.LOG,
 			new Configuration(),
-			false);
+			false,
+			new VoidBlobStore());
 
 		final AtomicReference<String> exception = new AtomicReference<>();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 2587b15..2eb103a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
@@ -58,7 +57,7 @@ public final class BlobCache implements BlobService {
 	private final File storageDir;
 
 	/** Blob store for distributed file storage, e.g. in HA */
-	private final BlobStore blobStore;
+	private final BlobView blobView;
 
 	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
@@ -78,55 +77,19 @@ public final class BlobCache implements BlobService {
 	 * 		address of the {@link BlobServer} to use for fetching files from
 	 * @param blobClientConfig
 	 * 		global configuration
-	 *
-	 * @throws IOException
-	 * 		thrown if the (local or distributed) file storage cannot be created or
-	 * 		is not usable
-	 */
-	public BlobCache(InetSocketAddress serverAddress,
-			Configuration blobClientConfig) throws IOException {
-		this(serverAddress, blobClientConfig,
-			BlobUtils.createBlobStoreFromConfig(blobClientConfig));
-	}
-
-	/**
-	 * Instantiates a new BLOB cache.
-	 *
-	 * @param serverAddress
-	 * 		address of the {@link BlobServer} to use for fetching files from
-	 * @param blobClientConfig
-	 * 		global configuration
-	 * 	@param haServices
-	 * 		high availability services able to create a distributed blob store
-	 *
-	 * @throws IOException
-	 * 		thrown if the (local or distributed) file storage cannot be created or
-	 * 		is not usable
-	 */
-	public BlobCache(InetSocketAddress serverAddress,
-		Configuration blobClientConfig, HighAvailabilityServices haServices) throws IOException {
-		this(serverAddress, blobClientConfig, haServices.createBlobStore());
-	}
-
-	/**
-	 * Instantiates a new BLOB cache.
-	 *
-	 * @param serverAddress
-	 * 		address of the {@link BlobServer} to use for fetching files from
-	 * @param blobClientConfig
-	 * 		global configuration
-	 * @param blobStore
+	 * @param blobView
 	 * 		(distributed) blob store file system to retrieve files from first
 	 *
 	 * @throws IOException
 	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
 	 */
-	private BlobCache(
-			final InetSocketAddress serverAddress, final Configuration blobClientConfig,
-			final BlobStore blobStore) throws IOException {
+	public BlobCache(
+			final InetSocketAddress serverAddress,
+			final Configuration blobClientConfig,
+			final BlobView blobView) throws IOException {
 		this.serverAddress = checkNotNull(serverAddress);
 		this.blobClientConfig = checkNotNull(blobClientConfig);
-		this.blobStore = blobStore;
+		this.blobView = checkNotNull(blobView, "blobStore");
 
 		// configure and create the storage directory
 		String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -169,7 +132,7 @@ public final class BlobCache implements BlobService {
 
 		// first try the distributed blob store (if available)
 		try {
-			blobStore.get(requiredBlob, localJarFile);
+			blobView.get(requiredBlob, localJarFile);
 		} catch (Exception e) {
 			LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
 		}
@@ -294,28 +257,23 @@ public final class BlobCache implements BlobService {
 	}
 
 	@Override
-	public void shutdown() {
+	public void close() throws IOException {
 		if (shutdownRequested.compareAndSet(false, true)) {
 			LOG.info("Shutting down BlobCache");
 
 			// Clean up the storage directory
 			try {
 				FileUtils.deleteDirectory(storageDir);
-			}
-			catch (IOException e) {
-				LOG.error("BLOB cache failed to properly clean up its storage directory.");
-			}
-
-			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
-			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
-				try {
-					Runtime.getRuntime().removeShutdownHook(shutdownHook);
-				}
-				catch (IllegalStateException e) {
-					// race, JVM is in shutdown already, we can safely ignore this
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+			} finally {
+				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
+				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+					try {
+						Runtime.getRuntime().removeShutdownHook(shutdownHook);
+					} catch (IllegalStateException e) {
+						// race, JVM is in shutdown already, we can safely ignore this
+					} catch (Throwable t) {
+						LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 8a70559..a006981 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -94,19 +94,14 @@ public class BlobServer extends Thread implements BlobService {
 	/**
 	 * Instantiates a new BLOB server and binds it to a free network port.
 	 *
+	 * @param config Configuration to be used to instantiate the BlobServer
+	 * @param blobStore BlobStore to store blobs persistently
+	 *
 	 * @throws IOException
 	 * 		thrown if the BLOB server cannot bind to a free network port or if the
 	 * 		(local or distributed) file storage cannot be created or is not usable
 	 */
-	public BlobServer(Configuration config) throws IOException {
-		this(config, BlobUtils.createBlobStoreFromConfig(config));
-	}
-
-	public BlobServer(Configuration config, HighAvailabilityServices haServices) throws IOException {
-		this(config, haServices.createBlobStore());
-	}
-
-	private BlobServer(Configuration config, BlobStore blobStore) throws IOException {
+	public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
 		this.blobServiceConfiguration = checkNotNull(config);
 		this.blobStore = checkNotNull(blobStore);
 
@@ -271,7 +266,12 @@ public class BlobServer extends Thread implements BlobService {
 		catch (Throwable t) {
 			if (!this.shutdownRequested.get()) {
 				LOG.error("BLOB server stopped working. Shutting down", t);
-				shutdown();
+
+				try {
+					close();
+				} catch (Throwable closeThrowable) {
+					LOG.error("Could not properly close the BlobServer.", closeThrowable);
+				}
 			}
 		}
 	}
@@ -280,13 +280,15 @@ public class BlobServer extends Thread implements BlobService {
 	 * Shuts down the BLOB server.
 	 */
 	@Override
-	public void shutdown() {
+	public void close() throws IOException {
 		if (shutdownRequested.compareAndSet(false, true)) {
+			Exception exception = null;
+
 			try {
 				this.serverSocket.close();
 			}
 			catch (IOException ioe) {
-				LOG.debug("Error while closing the server socket.", ioe);
+				exception = ioe;
 			}
 
 			// wake the thread up, in case it is waiting on some operation
@@ -296,13 +298,15 @@ public class BlobServer extends Thread implements BlobService {
 				join();
 			}
 			catch (InterruptedException ie) {
+				Thread.currentThread().interrupt();
+
 				LOG.debug("Error while waiting for this thread to die.", ie);
 			}
 
 			synchronized (activeConnections) {
 				if (!activeConnections.isEmpty()) {
 					for (BlobServerConnection conn : activeConnections) {
-						LOG.debug("Shutting down connection " + conn.getName());
+						LOG.debug("Shutting down connection {}.", conn.getName());
 						conn.close();
 					}
 					activeConnections.clear();
@@ -314,7 +318,7 @@ public class BlobServer extends Thread implements BlobService {
 				FileUtils.deleteDirectory(storageDir);
 			}
 			catch (IOException e) {
-				LOG.error("BLOB server failed to properly clean up its storage directory.");
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
 			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
@@ -327,13 +331,15 @@ public class BlobServer extends Thread implements BlobService {
 					// race, JVM is in shutdown already, we can safely ignore this
 				}
 				catch (Throwable t) {
-					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
+					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t);
 				}
 			}
 
 			if(LOG.isInfoEnabled()) {
 				LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
 			}
+
+			ExceptionUtils.tryRethrowIOException(exception);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 419ee8d..97a2d51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.blob;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URL;
 
 /**
  * A simple store and retrieve binary large objects (BLOBs).
  */
-public interface BlobService {
+public interface BlobService extends Closeable {
 
 	/**
 	 * This method returns the URL of the file associated with the provided blob key.
@@ -49,11 +50,6 @@ public interface BlobService {
 	 * @return the port of the blob service.
 	 */
 	int getPort();
-
-	/**
-	 * Shutdown method which is called to terminate the blob service.
-	 */
-	void shutdown();
 	
 	BlobClient createClient() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 64dc942..4c26a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 /**
  * A blob store.
  */
-public interface BlobStore {
+public interface BlobStore extends BlobView {
 
 	/**
 	 * Copies the local file to the blob store.
@@ -50,25 +50,6 @@ public interface BlobStore {
 	void put(File localFile, JobID jobId, String key) throws IOException;
 
 	/**
-	 * Copies a blob to a local file.
-	 *
-	 * @param blobKey   The blob ID
-	 * @param localFile The local file to copy to
-	 * @throws IOException If the copy fails
-	 */
-	void get(BlobKey blobKey, File localFile) throws IOException;
-
-	/**
-	 * Copies a blob to a local file.
-	 *
-	 * @param jobId     The JobID part of ID for the blob
-	 * @param key       The String part of ID for the blob
-	 * @param localFile The local file to copy to
-	 * @throws IOException If the copy fails
-	 */
-	void get(JobID jobId, String key, File localFile) throws IOException;
-
-	/**
 	 * Tries to delete a blob from storage.
 	 *
 	 * <p>NOTE: This also tries to delete any created directories if empty.</p>
@@ -95,9 +76,4 @@ public interface BlobStore {
 	 * @param jobId The JobID part of all blobs to delete
 	 */
 	void deleteAll(JobID jobId);
-
-	/**
-	 * Cleans up the store and deletes all blobs.
-	 */
-	void cleanUp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
new file mode 100644
index 0000000..83cd9d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import java.io.Closeable;
+
+/**
+ * Service interface for the BlobStore which allows to close and clean up its data.
+ */
+public interface BlobStoreService extends BlobStore, Closeable {
+
+	/**
+	 * Closes and cleans up the store. This entails the deletion of all blobs.
+	 */
+	void closeAndCleanupAllData();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 3c14f2f..8da362d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -22,8 +22,10 @@ import com.google.common.io.BaseEncoding;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
@@ -41,6 +43,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * Utility class to work with blob data.
@@ -78,18 +81,49 @@ public class BlobUtils {
 	 * @throws IOException
 	 * 		thrown if the (distributed) file storage cannot be created
 	 */
-	static BlobStore createBlobStoreFromConfig(Configuration config) throws IOException {
+	public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
 		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
 
 		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			return new VoidBlobStore();
 		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
-			return ZooKeeperHaServices.createBlobStore(config);
+			return createFileSystemBlobStore(config);
 		} else {
 			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'.");
 		}
 	}
 
+	private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException {
+		String storagePath = configuration.getValue(
+			HighAvailabilityOptions.HA_STORAGE_PATH);
+		if (isNullOrWhitespaceOnly(storagePath)) {
+			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+				HighAvailabilityOptions.HA_STORAGE_PATH);
+		}
+
+		final Path path;
+		try {
+			path = new Path(storagePath);
+		} catch (Exception e) {
+			throw new IOException("Invalid path for highly available storage (" +
+				HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final FileSystem fileSystem;
+		try {
+			fileSystem = path.getFileSystem();
+		} catch (Exception e) {
+			throw new IOException("Could not create FileSystem for highly available storage (" +
+				HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final String clusterId =
+			configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+		storagePath += "/" + clusterId;
+
+		return new FileSystemBlobStore(fileSystem, storagePath);
+	}
+
 	/**
 	 * Creates a storage directory for a blob service.
 	 *
@@ -246,10 +280,10 @@ public class BlobUtils {
 			@Override
 			public void run() {
 				try {
-					service.shutdown();
+					service.close();
 				}
 				catch (Throwable t) {
-					logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(), t);
+					logger.error("Error during shutdown of blob service via JVM shutdown hook.", t);
 				}
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
new file mode 100644
index 0000000..11cf011
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * View on blobs stored in a {@link BlobStore}.
+ */
+public interface BlobView {
+
+	/**
+	 * Copies a blob to a local file.
+	 *
+	 * @param blobKey   The blob ID
+	 * @param localFile The local file to copy to
+	 * @throws IOException If the copy fails
+	 */
+	void get(BlobKey blobKey, File localFile) throws IOException;
+
+	/**
+	 * Copies a blob to a local file.
+	 *
+	 * @param jobId     The JobID part of ID for the blob
+	 * @param key       The String part of ID for the blob
+	 * @param localFile The local file to copy to
+	 * @throws IOException If the copy fails
+	 */
+	void get(JobID jobId, String key, File localFile) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 7cfce7a..b54756c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -41,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>This is used in addition to the local blob storage for high availability.
  */
-public class FileSystemBlobStore implements BlobStore {
+public class FileSystemBlobStore implements BlobStoreService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
 
@@ -157,14 +157,19 @@ public class FileSystemBlobStore implements BlobStore {
 	}
 
 	@Override
-	public void cleanUp() {
+	public void closeAndCleanupAllData() {
 		try {
 			LOG.debug("Cleaning up {}.", basePath);
 
 			fileSystem.delete(new Path(basePath), true);
 		}
 		catch (Exception e) {
-			LOG.error("Failed to clean up recovery directory.");
+			LOG.error("Failed to clean up recovery directory.", e);
 		}
 	}
+
+	@Override
+	public void close() throws IOException {
+		// nothing to do for the FileSystemBlobStore
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 8606844..c14d082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 /**
  * A blob store doing nothing.
  */
-public class VoidBlobStore implements BlobStore {
+public class VoidBlobStore implements BlobStoreService {
 
 	@Override
 	public void put(File localFile, BlobKey blobKey) throws IOException {
@@ -57,6 +57,8 @@ public class VoidBlobStore implements BlobStore {
 	}
 
 	@Override
-	public void cleanUp() {
-	}
+	public void closeAndCleanupAllData() {}
+
+	@Override
+	public void close() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index b570383..86d927a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -191,7 +191,8 @@ public class JobClient {
 	public static ClassLoader retrieveClassLoader(
 		JobID jobID,
 		ActorGateway jobManager,
-		Configuration config)
+		Configuration config,
+		HighAvailabilityServices highAvailabilityServices)
 		throws JobRetrievalException {
 
 		final Object jmAnswer;
@@ -213,7 +214,8 @@ public class JobClient {
 			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
 			final BlobCache blobClient;
 			try {
-				blobClient = new BlobCache(serverAddress, config);
+				// TODO: Fix lifecycle of BlobCache to properly close it upon usage
+				blobClient = new BlobCache(serverAddress, config, highAvailabilityServices.createBlobStore());
 			} catch (IOException e) {
 				throw new JobRetrievalException(jobID,
 					"Failed to setup blob cache", e);
@@ -229,7 +231,12 @@ public class JobClient {
 				try {
 					allURLs[pos++] = blobClient.getURL(blobKey);
 				} catch (Exception e) {
-					blobClient.shutdown();
+					try {
+						blobClient.close();
+					} catch (IOException ioe) {
+						LOG.warn("Could not properly close the BlobClient.", ioe);
+					}
+
 					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey, e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index fe8c34c..bb448be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -134,7 +134,11 @@ public final class JobListeningContext {
 	public ClassLoader getClassLoader() throws JobRetrievalException {
 		if (classLoader == null) {
 			// lazily initializes the class loader when it is needed
-			classLoader = JobClient.retrieveClassLoader(jobID, getJobManager(), configuration);
+			classLoader = JobClient.retrieveClassLoader(
+				jobID,
+				getJobManager(),
+				configuration,
+				highAvailabilityServices);
 			LOG.info("Reconstructed class loader for Job {}", jobID);
 		}
 		return classLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ea508d1..5bdfe1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.NetUtils;
@@ -191,13 +190,12 @@ public class BootstrapTools {
 		if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 			logger.info("Starting JobManager Web Frontend");
 
-			LeaderRetrievalService leaderRetrievalService =
-				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-
 			// start the web frontend. we need to load this dynamically
 			// because it is not in the same project/dependencies
 			WebMonitor monitor = WebMonitorUtils.startWebRuntimeMonitor(
-				config, leaderRetrievalService, actorSystem);
+				config,
+				highAvailabilityServices,
+				actorSystem);
 
 			// start the web monitor
 			if (monitor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index b0d5d83..0702a11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -208,7 +208,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 			LOG.warn("Failed to run clean up task before shutdown", t);
 		}
 
-		blobService.shutdown();
+		blobService.close();
 		cleanupTimer.cancel();
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index c9e2957..2ebfd20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
@@ -49,10 +51,13 @@ public class HighAvailabilityServicesUtils {
 				return new EmbeddedHaServices(executor);
 
 			case ZOOKEEPER:
+				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
 				return new ZooKeeperHaServices(
 					ZooKeeperUtils.startCuratorFramework(config),
 					executor,
-					config);
+					config,
+					blobStoreService);
 
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
@@ -85,10 +90,13 @@ public class HighAvailabilityServicesUtils {
 
 				return new StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl);
 			case ZOOKEEPER:
+				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
+
 				return new ZooKeeperHaServices(
 					ZooKeeperUtils.startCuratorFramework(configuration),
 					executor,
-					configuration);
+					configuration,
+					blobStoreService);
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index ac90e3f..9c3d986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -44,10 +44,13 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 
 	private final RunningJobsRegistry runningJobsRegistry;
 
+	private final VoidBlobStore voidBlobStore;
+
 	private boolean shutdown;
 
 	public AbstractNonHaServices() {
 		this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
+		this.voidBlobStore = new VoidBlobStore();
 
 		shutdown = false;
 	}
@@ -88,7 +91,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 		synchronized (lock) {
 			checkNotShutdown();
 
-			return new VoidBlobStore();
+			return voidBlobStore;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 5d895c1..d4748cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -23,11 +23,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -36,12 +33,12 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
@@ -102,11 +99,20 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 	/** The zookeeper based running jobs registry */
 	private final RunningJobsRegistry runningJobsRegistry;
 
-	public ZooKeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
+	/** Store for arbitrary blobs */
+	private final BlobStoreService blobStoreService;
+
+	public ZooKeeperHaServices(
+			CuratorFramework client,
+			Executor executor,
+			Configuration configuration,
+			BlobStoreService blobStoreService) {
 		this.client = checkNotNull(client);
 		this.executor = checkNotNull(executor);
 		this.configuration = checkNotNull(configuration);
 		this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
+
+		this.blobStoreService = checkNotNull(blobStoreService);
 	}
 
 	// ------------------------------------------------------------------------
@@ -150,61 +156,52 @@ public class ZooKeeperHaServices implements HighAvailabilityServices {
 
 	@Override
 	public BlobStore createBlobStore() throws IOException {
-		return createBlobStore(configuration);
+		return blobStoreService;
 	}
 
-	/**
-	 * Creates the BLOB store in which BLOBs are stored in a highly-available
-	 * fashion.
-	 *
-	 * @param configuration configuration to extract the storage path from
-	 * @return Blob store
-	 * @throws IOException if the blob store could not be created
-	 */
-	public static BlobStore createBlobStore(
-		final Configuration configuration) throws IOException {
-		String storagePath = configuration.getValue(
-			HighAvailabilityOptions.HA_STORAGE_PATH);
-		if (isNullOrWhitespaceOnly(storagePath)) {
-			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
-					HighAvailabilityOptions.HA_STORAGE_PATH);
-		}
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
 
-		final Path path;
-		try {
-			path = new Path(storagePath);
-		} catch (Exception e) {
-			throw new IOException("Invalid path for highly available storage (" +
-					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-		}
+	@Override
+	public void close() throws Exception {
+		Throwable exception = null;
 
-		final FileSystem fileSystem;
 		try {
-			fileSystem = path.getFileSystem();
-		} catch (Exception e) {
-			throw new IOException("Could not create FileSystem for highly available storage (" +
-					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+			blobStoreService.close();
+		} catch (Throwable t) {
+			exception = t;
 		}
 
-		final String clusterId =
-			configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
-		storagePath += "/" + clusterId;
+		internalClose();
 
-		return new FileSystemBlobStore(fileSystem, storagePath);
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
+		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  Shutdown
-	// ------------------------------------------------------------------------
-
 	@Override
-	public void close() throws Exception {
-		client.close();
+	public void closeAndCleanupAllData() throws Exception {
+		Throwable exception = null;
+
+		try {
+			blobStoreService.closeAndCleanupAllData();
+		} catch (Throwable t) {
+			exception = t;
+		}
+
+		internalClose();
+
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
+		}
 	}
 
-	@Override
-	public void closeAndCleanupAllData() throws Exception {
-		close();
+	/**
+	 * Closes components which don't distinguish between close and closeAndCleanupAllData
+	 */
+	private void internalClose() {
+		client.close();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 8cda0f7..ac4d06f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -105,7 +105,7 @@ public class JobManagerServices {
 			Configuration config,
 			HighAvailabilityServices haServices) throws Exception {
 
-		final BlobServer blobServer = new BlobServer(config, haServices);
+		final BlobServer blobServer = new BlobServer(config, haServices.createBlobStore());
 
 		final long cleanupInterval = config.getLong(
 			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d05d900..a919065 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -923,7 +923,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		final LibraryCacheManager libraryCacheManager;
 		try {
-			final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices);
+			final BlobCache blobCache = new BlobCache(
+				blobServerAddress,
+				taskManagerConfiguration.getConfiguration(),
+				haServices.createBlobStore());
 			libraryCacheManager = new BlobLibraryCacheManager(
 				blobCache,
 				taskManagerConfiguration.getCleanupInterval());

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 2baadb5..106d0f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -28,10 +28,12 @@ import java.net.URI;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -117,12 +119,14 @@ public final class WebMonitorUtils {
 	 * Because failure to start the web runtime monitor is not considered fatal, this method does
 	 * not throw any exceptions, but only logs them.
 	 *
-	 * @param config                 The configuration for the runtime monitor.
-	 * @param leaderRetrievalService Leader retrieval service to get the leading JobManager
+	 * @param config The configuration for the runtime monitor.
+	 * @param highAvailabilityServices HighAvailabilityServices used to start the WebRuntimeMonitor
+	 * @param actorSystem ActorSystem used to connect to the JobManager
+	 *
 	 */
 	public static WebMonitor startWebRuntimeMonitor(
 			Configuration config,
-			LeaderRetrievalService leaderRetrievalService,
+			HighAvailabilityServices highAvailabilityServices,
 			ActorSystem actorSystem) {
 		// try to load and instantiate the class
 		try {
@@ -130,9 +134,14 @@ public final class WebMonitorUtils {
 			Class<? extends WebMonitor> clazz = Class.forName(classname).asSubclass(WebMonitor.class);
 			
 			Constructor<? extends WebMonitor> constructor = clazz.getConstructor(Configuration.class,
-					LeaderRetrievalService.class,
-					ActorSystem.class);
-			return constructor.newInstance(config, leaderRetrievalService, actorSystem);
+				LeaderRetrievalService.class,
+				BlobView.class,
+				ActorSystem.class);
+			return constructor.newInstance(
+				config,
+				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+				highAvailabilityServices.createBlobStore(),
+				actorSystem);
 		} catch (ClassNotFoundException e) {
 			LOG.error("Could not load web runtime monitor. " +
 					"Probably reason: flink-runtime-web is not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5092643..1ea783b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -36,7 +36,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
-import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore}
 import org.apache.flink.runtime.client._
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
@@ -2274,14 +2274,12 @@ object JobManager {
     val webMonitor: Option[WebMonitor] =
       if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
-        val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-          HighAvailabilityServices.DEFAULT_JOB_ID)
 
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
         val webServer = WebMonitorUtils.startWebRuntimeMonitor(
           configuration,
-          leaderRetrievalService,
+          highAvailabilityServices,
           jobManagerSystem)
 
         Option(webServer)
@@ -2507,12 +2505,14 @@ object JobManager {
    * @param configuration The configuration from which to parse the config values.
    * @param futureExecutor to run JobManager's futures
    * @param ioExecutor to run blocking io operations
+   * @param blobStore to store blobs persistently
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
       futureExecutor: ScheduledExecutorService,
-      ioExecutor: Executor) :
+      ioExecutor: Executor,
+      blobStore: BlobStore) :
     (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
@@ -2558,7 +2558,7 @@ object JobManager {
     var libraryCacheManager: BlobLibraryCacheManager = null
 
     try {
-      blobServer = new BlobServer(configuration)
+      blobServer = new BlobServer(configuration, blobStore)
       instanceManager = new InstanceManager()
       scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval)
@@ -2577,7 +2577,7 @@ object JobManager {
           instanceManager.shutdown()
         }
         if (blobServer != null) {
-          blobServer.shutdown()
+          blobServer.close()
         }
         
         throw t
@@ -2689,7 +2689,8 @@ object JobManager {
     metricsRegistry) = createJobManagerComponents(
       configuration,
       futureExecutor,
-      ioExecutor)
+      ioExecutor,
+      highAvailabilityServices.createBlobStore())
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
@@ -2745,7 +2746,7 @@ object JobManager {
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
+    libraryCacheManager: LibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
     timeout: FiniteDuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 46c4404..2ace8db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, High
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -387,17 +387,13 @@ abstract class FlinkMiniCluster(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 
-      // TODO: Add support for HA: Make web server work independently from the JM
-      val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-        HighAvailabilityServices.DEFAULT_JOB_ID)
-
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
       // because it is not in the same project/dependencies
       val webServer = Option(
         WebMonitorUtils.startWebRuntimeMonitor(
           config,
-          leaderRetrievalService,
+          highAvailabilityServices,
           actorSystem)
       )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8677307..a535388 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -143,7 +143,8 @@ class LocalFlinkMiniCluster(
     metricsRegistry) = JobManager.createJobManagerComponents(
       config,
       futureExecutor,
-      ioExecutor)
+      ioExecutor,
+      highAvailabilityServices.createBlobStore())
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
       metricsRegistry.get.startQueryService(system, null)
@@ -249,8 +250,6 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      highAvailabilityServices.getJobManagerLeaderRetriever(
-        HighAvailabilityServices.DEFAULT_JOB_ID),
       metricRegistry)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
@@ -315,7 +314,6 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService,
     metricsRegistry: MetricRegistry): Props = {
 
     TaskManager.getTaskManagerProps(
@@ -326,7 +324,7 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricsRegistry)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a3110a4..7684a6b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -125,7 +125,7 @@ class TaskManager(
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
-    protected val leaderRetrievalService: LeaderRetrievalService,
+    protected val highAvailabilityServices: HighAvailabilityServices,
     protected val metricsRegistry: FlinkMetricRegistry)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
@@ -149,6 +149,10 @@ class TaskManager(
   /** Handler for distributed files cached by this TaskManager */
   protected val fileCache = new FileCache(config.getTmpDirectories())
 
+  protected val leaderRetrievalService: LeaderRetrievalService = highAvailabilityServices.
+    getJobManagerLeaderRetriever(
+      HighAvailabilityServices.DEFAULT_JOB_ID)
+
   private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
   /** Actors which want to be notified once this task manager has been
@@ -959,7 +963,10 @@ class TaskManager(
       log.info(s"Determined BLOB server address to be $address. Starting BLOB cache.")
 
       try {
-        val blobcache = new BlobCache(address, config.getConfiguration())
+        val blobcache = new BlobCache(
+          address,
+          config.getConfiguration(),
+          highAvailabilityServices.createBlobStore())
         blobService = Option(blobcache)
         libraryCacheManager = Some(
           new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
@@ -1039,12 +1046,24 @@ class TaskManager(
 
     // shut down BLOB and library cache
     libraryCacheManager foreach {
-      manager => manager.shutdown()
+      manager =>
+        try {
+          manager.shutdown()
+        } catch {
+          case ioe: IOException => log.error(
+            "Could not properly shutdown library cache manager.",
+            ioe)
+        }
     }
     libraryCacheManager = None
 
     blobService foreach {
-      service => service.shutdown()
+      service =>
+        try {
+          service.close()
+        } catch {
+          case ioe: IOException => log.error("Could not properly shutdown blob service.", ioe)
+        }
     }
     blobService = None
 
@@ -1905,9 +1924,6 @@ object TaskManager {
 
     val metricRegistry = taskManagerServices.getMetricRegistry()
 
-    val leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-      HighAvailabilityServices.DEFAULT_JOB_ID)
-
     // create the actor properties (which define the actor constructor parameters)
     val tmProps = getTaskManagerProps(
       taskManagerClass,
@@ -1917,7 +1933,7 @@ object TaskManager {
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricRegistry)
 
     metricRegistry.startQueryService(actorSystem, resourceID)
@@ -1936,7 +1952,7 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricsRegistry: FlinkMetricRegistry
   ): Props = {
     Props(
@@ -1948,7 +1964,7 @@ object TaskManager {
       ioManager,
       networkEnvironment,
       taskManagerConfig.getNumberSlots(),
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricsRegistry)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 34a8a39..1cf77ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -43,10 +43,10 @@ public class BlobCacheRetriesTest {
 	 * A test where the connection fails twice and then the get operation succeeds.
 	 */
 	@Test
-	public void testBlobFetchRetries() {
+	public void testBlobFetchRetries() throws IOException {
 		final Configuration config = new Configuration();
 
-		testBlobFetchRetries(config);
+		testBlobFetchRetries(config, new VoidBlobStore());
 	}
 
 	/**
@@ -54,13 +54,23 @@ public class BlobCacheRetriesTest {
 	 * (with high availability set).
 	 */
 	@Test
-	public void testBlobFetchRetriesHa() {
+	public void testBlobFetchRetriesHa() throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.getRoot().getPath());
 
-		testBlobFetchRetries(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testBlobFetchRetries(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	/**
@@ -71,14 +81,14 @@ public class BlobCacheRetriesTest {
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
 	 */
-	private void testBlobFetchRetries(final Configuration config) {
+	private void testBlobFetchRetries(final Configuration config, final BlobStore blobStore) throws IOException {
 		final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
 
 		BlobServer server = null;
 		BlobCache cache = null;
 		try {
 
-			server = new TestingFailingBlobServer(config, 2);
+			server = new TestingFailingBlobServer(config, blobStore, 2);
 
 			final InetSocketAddress
 				serverAddress = new InetSocketAddress("localhost", server.getPort());
@@ -97,13 +107,7 @@ public class BlobCacheRetriesTest {
 				}
 			}
 
-			// create a separate config for the cache with no access to
-			// the (shared) storage path if available so that the cache
-			// will always bother the BlobServer!
-			final Configuration cacheConfig = new Configuration(config);
-			cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-				temporaryFolder.getRoot().getPath() + "/does-not-exist");
-			cache = new BlobCache(serverAddress, cacheConfig);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// trigger a download - it should fail the first two times, but retry, and succeed eventually
 			URL url = cache.getURL(key);
@@ -116,17 +120,12 @@ public class BlobCacheRetriesTest {
 			finally {
 				is.close();
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}
@@ -135,10 +134,10 @@ public class BlobCacheRetriesTest {
 	 * A test where the connection fails too often and eventually fails the GET request.
 	 */
 	@Test
-	public void testBlobFetchWithTooManyFailures() {
+	public void testBlobFetchWithTooManyFailures() throws IOException {
 		final Configuration config = new Configuration();
 
-		testBlobFetchWithTooManyFailures(config);
+		testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
 	}
 
 	/**
@@ -146,13 +145,23 @@ public class BlobCacheRetriesTest {
 	 * (with high availability set).
 	 */
 	@Test
-	public void testBlobFetchWithTooManyFailuresHa() {
+	public void testBlobFetchWithTooManyFailuresHa() throws IOException {
 		final Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
 			temporaryFolder.getRoot().getPath());
 
-		testBlobFetchWithTooManyFailures(config);
+		BlobStoreService blobStoreService = null;
+
+		try {
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+			testBlobFetchWithTooManyFailures(config, blobStoreService);
+		} finally {
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
+			}
+		}
 	}
 
 	/**
@@ -163,14 +172,14 @@ public class BlobCacheRetriesTest {
 	 * 		configuration to use (the BlobCache will get some additional settings
 	 * 		set compared to this one)
 	 */
-	private void testBlobFetchWithTooManyFailures(final Configuration config) {
+	private void testBlobFetchWithTooManyFailures(final Configuration config, final BlobStore blobStore) throws IOException {
 		final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
 
 		BlobServer server = null;
 		BlobCache cache = null;
 		try {
 
-			server = new TestingFailingBlobServer(config, 10);
+			server = new TestingFailingBlobServer(config, blobStore, 10);
 
 			final InetSocketAddress
 				serverAddress = new InetSocketAddress("localhost", server.getPort());
@@ -189,13 +198,7 @@ public class BlobCacheRetriesTest {
 				}
 			}
 
-			// create a separate config for the cache with no access to
-			// the (shared) storage path if available so that the cache
-			// will always bother the BlobServer!
-			final Configuration cacheConfig = new Configuration(config);
-			cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-				temporaryFolder.getRoot().getPath() + "/does-not-exist");
-			cache = new BlobCache(serverAddress, cacheConfig);
+			cache = new BlobCache(serverAddress, config, new VoidBlobStore());
 
 			// trigger a download - it should fail eventually
 			try {
@@ -206,16 +209,12 @@ public class BlobCacheRetriesTest {
 				// as we expected
 			}
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 		finally {
 			if (cache != null) {
-				cache.shutdown();
+				cache.close();
 			}
 			if (server != null) {
-				server.shutdown();
+				server.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index db55331..2a65a3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -49,7 +50,7 @@ public class BlobCacheSuccessTest {
 	 * BlobServer.
 	 */
 	@Test
-	public void testBlobCache() {
+	public void testBlobCache() throws IOException {
 		Configuration config = new Configuration();
 		uploadFileGetTest(config, false, false);
 	}
@@ -60,7 +61,7 @@ public class BlobCacheSuccessTest {
 	 * BlobServer.
 	 */
 	@Test
-	public void testBlobCacheHa() {
+	public void testBlobCacheHa() throws IOException {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -73,7 +74,7 @@ public class BlobCacheSuccessTest {
 	 * file system and thus needs to download BLOBs from the BlobServer.
 	 */
 	@Test
-	public void testBlobCacheHaFallback() {
+	public void testBlobCacheHaFallback() throws IOException {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -82,17 +83,30 @@ public class BlobCacheSuccessTest {
 	}
 
 	private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer,
-		boolean cacheHasAccessToFs) {
+		boolean cacheHasAccessToFs) throws IOException {
 		// First create two BLOBs and upload them to BLOB server
 		final byte[] buf = new byte[128];
 		final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
 
 		BlobServer blobServer = null;
 		BlobCache blobCache = null;
+		BlobStoreService blobStoreService = null;
 		try {
+			final Configuration cacheConfig;
+			if (cacheHasAccessToFs) {
+				cacheConfig = config;
+			} else {
+				// just in case parameters are still read from the server,
+				// create a separate configuration object for the cache
+				cacheConfig = new Configuration(config);
+				cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+					temporaryFolder.getRoot().getPath() + "/does-not-exist");
+			}
+
+			blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig);
 
 			// Start the BLOB server
-			blobServer = new BlobServer(config);
+			blobServer = new BlobServer(config, blobStoreService);
 			final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort());
 
 			// Upload BLOBs
@@ -112,22 +126,11 @@ public class BlobCacheSuccessTest {
 
 			if (cacheWorksWithoutServer) {
 				// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
-				blobServer.shutdown();
+				blobServer.close();
 				blobServer = null;
 			}
 
-			final Configuration cacheConfig;
-			if (cacheHasAccessToFs) {
-				cacheConfig = config;
-			} else {
-				// just in case parameters are still read from the server,
-				// create a separate configuration object for the cache
-				cacheConfig = new Configuration(config);
-				cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-					temporaryFolder.getRoot().getPath() + "/does-not-exist");
-			}
-
-			blobCache = new BlobCache(serverAddress, cacheConfig);
+			blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);
 
 			for (BlobKey blobKey : blobKeys) {
 				blobCache.getURL(blobKey);
@@ -135,7 +138,7 @@ public class BlobCacheSuccessTest {
 
 			if (blobServer != null) {
 				// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
-				blobServer.shutdown();
+				blobServer.close();
 				blobServer = null;
 			}
 
@@ -162,18 +165,17 @@ public class BlobCacheSuccessTest {
 					fail(e.getMessage());
 				}
 			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
+		} finally {
 			if (blobServer != null) {
-				blobServer.shutdown();
+				blobServer.close();
 			}
 
 			if(blobCache != null){
-				blobCache.shutdown();
+				blobCache.close();
+			}
+
+			if (blobStoreService != null) {
+				blobStoreService.closeAndCleanupAllData();
 			}
 		}
 	}


Mime
View raw message