flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [09/47] flink git commit: [FLINK-2805] [blobmanager] Write JARs to file state backend for recovery
Date Tue, 20 Oct 2015 07:59:00 GMT
[FLINK-2805] [blobmanager] Write JARs to file state backend for recovery

Move StateBackend enum to top level and org.apache.flink.runtime.state

Abstract blob store in blob server for recovery

This closes #1227.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3a4d1d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3a4d1d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3a4d1d9

Branch: refs/heads/master
Commit: c3a4d1d9f720a1da9697d0bbf48f7a3b1f5851b8
Parents: c2989f2
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Oct 5 14:30:46 2015 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Oct 20 00:16:52 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobServer.java   | 105 +++++++++--
 .../runtime/blob/BlobServerConnection.java      |  52 ++++--
 .../apache/flink/runtime/blob/BlobStore.java    |  97 ++++++++++
 .../apache/flink/runtime/blob/BlobUtils.java    |  75 +++++++-
 .../flink/runtime/blob/FileSystemBlobStore.java | 186 +++++++++++++++++++
 .../flink/runtime/blob/VoidBlobStore.java       |  61 ++++++
 .../flink/runtime/jobmanager/RecoveryMode.java  |  12 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 159 ++++++++++++++++
 .../BlobLibraryCacheRecoveryITCase.java         | 176 ++++++++++++++++++
 9 files changed, 874 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ef2ef61..d0bed8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -30,13 +38,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * This class implements the BLOB server. The BLOB server is responsible for listening for
incoming requests and
@@ -57,12 +59,12 @@ public class BlobServer extends Thread implements BlobService {
 	/** Indicates whether a shutdown of server component has been requested. */
 	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
-	/** Shutdown hook thread to ensure deletion of the storage directory. */
-	private final Thread shutdownHook;
-
 	/** Is the root directory for file storage */
 	private final File storageDir;
 
+	/** Blob store for recovery */
+	private final BlobStore blobStore;
+
 	/** Set of currently running threads */
 	private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
 
@@ -70,18 +72,43 @@ public class BlobServer extends Thread implements BlobService {
 	private final int maxConnections;
 
 	/**
+	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code>
if
+	 * the configured recovery mode does not equal{@link RecoveryMode#STANDALONE})
+	 */
+	private final Thread shutdownHook;
+
+	/**
 	 * Instantiates a new BLOB server and binds it to a free network port.
-	 * 
+	 *
 	 * @throws IOException
 	 *         thrown if the BLOB server cannot bind to a free network port
 	 */
 	public BlobServer(Configuration config) throws IOException {
+		checkNotNull(config, "Configuration");
+
+		RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
 
 		// configure and create the storage directory
 		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,
null);
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
+		// No recovery.
+		if (recoveryMode == RecoveryMode.STANDALONE) {
+			this.blobStore = new VoidBlobStore();
+		}
+		// Recovery. Check that everything has been setup correctly. This is not clean, but it's
+		// better to resolve this with some upcoming changes to the state backend setup.
+		else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
+				config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) {
+
+			this.blobStore = new FileSystemBlobStore(config);
+		}
+		// Fallback.
+		else {
+			this.blobStore = new VoidBlobStore();
+		}
+
 		// configure the maximum number of concurrent connections
 		final int maxConnections = config.getInteger(
 				ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
@@ -102,8 +129,13 @@ public class BlobServer extends Thread implements BlobService {
 			backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
 		}
 
-		// Add shutdown hook to delete storage directory
-		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+		if (recoveryMode == RecoveryMode.STANDALONE) {
+			// Add shutdown hook to delete storage directory
+			this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+		}
+		else {
+			this.shutdownHook = null;
+		}
 
 		// start the server
 		try {
@@ -132,37 +164,43 @@ public class BlobServer extends Thread implements BlobService {
 	 * Returns a file handle to the file associated with the given blob key on the blob
 	 * server.
 	 *
+	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+	 *
 	 * @param key identifying the file
 	 * @return file handle to the file
 	 */
-	public File getStorageLocation(BlobKey key) {
+	File getStorageLocation(BlobKey key) {
 		return BlobUtils.getStorageLocation(storageDir, key);
 	}
 
 	/**
 	 * Returns a file handle to the file identified by the given jobID and key.
 	 *
+	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+	 *
 	 * @param jobID to which the file is associated
 	 * @param key to identify the file within the job context
 	 * @return file handle to the file
 	 */
-	public File getStorageLocation(JobID jobID, String key) {
+	File getStorageLocation(JobID jobID, String key) {
 		return BlobUtils.getStorageLocation(storageDir, jobID, key);
 	}
 
 	/**
 	 * Method which deletes all files associated with the given jobID.
 	 *
+	 * <p><strong>This is only called from the {@link BlobServerConnection}</strong>
+	 *
 	 * @param jobID all files associated to this jobID will be deleted
 	 * @throws IOException
 	 */
-	public void deleteJobDirectory(JobID jobID) throws IOException {
+	void deleteJobDirectory(JobID jobID) throws IOException {
 		BlobUtils.deleteJobDirectory(storageDir, jobID);
 	}
 
 	/**
 	 * Returns a temporary file inside the BLOB server's incoming directory.
-	 * 
+	 *
 	 * @return a temporary file inside the BLOB server's incoming directory
 	 */
 	File createTemporaryFilename() {
@@ -170,6 +208,13 @@ public class BlobServer extends Thread implements BlobService {
 				String.format("temp-%08d", tempFileCounter.getAndIncrement()));
 	}
 
+	/**
+	 * Returns the blob store.
+	 */
+	BlobStore getBlobStore() {
+		return blobStore;
+	}
+
 	@Override
 	public void run() {
 		try {
@@ -245,6 +290,9 @@ public class BlobServer extends Thread implements BlobService {
 				LOG.error("BLOB server failed to properly clean up its storage directory.");
 			}
 
+			// Clean up the recovery directory
+			blobStore.cleanUp();
+
 			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
 			// shutdown hook itself
 			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
@@ -282,11 +330,26 @@ public class BlobServer extends Thread implements BlobService {
 
 		final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
-		if (!localFile.exists()) {
-			throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does not exist.");
-		} else {
+		if (localFile.exists()) {
 			return localFile.toURI().toURL();
 		}
+		else {
+			try {
+				// Try the blob store
+				blobStore.get(requiredBlob, localFile);
+			}
+			catch (Exception e) {
+				throw new IOException("Failed to copy from blob store.", e);
+			}
+
+			if (localFile.exists()) {
+				return localFile.toURI().toURL();
+			}
+			else {
+				throw new FileNotFoundException("Local file " + localFile + " does not exist " +
+						"and failed to copy from blob store.");
+			}
+		}
 	}
 
 	/**
@@ -305,6 +368,8 @@ public class BlobServer extends Thread implements BlobService {
 				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
 			}
 		}
+
+		blobStore.delete(key);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 793a9d6..d7bba8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.blob;
 
+import com.google.common.io.Files;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -29,28 +35,21 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.security.MessageDigest;
 
-import com.google.common.io.Files;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
 import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
 
-import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
-
 /**
  * A BLOB connection handles a series of requests from a particular BLOB client.
  */
@@ -181,10 +180,18 @@ class BlobServerConnection extends Thread {
 				JobID jobID = JobID.fromByteArray(jidBytes);
 				String key = readKey(buf, inputStream);
 				blobFile = this.blobServer.getStorageLocation(jobID, key);
+
+				if (!blobFile.exists()) {
+					blobServer.getBlobStore().get(jobID, key, blobFile);
+				}
 			}
 			else if (contentAddressable == CONTENT_ADDRESSABLE) {
 				final BlobKey key = BlobKey.readFromInputStream(inputStream);
 				blobFile = blobServer.getStorageLocation(key);
+
+				if (!blobFile.exists()) {
+					blobServer.getBlobStore().get(key, blobFile);
+				}
 			}
 			else {
 				throw new IOException("Unknown type of BLOB addressing.");
@@ -194,6 +201,7 @@ class BlobServerConnection extends Thread {
 			if (!blobFile.exists()) {
 				throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
 			}
+
 			if (blobFile.length() > Integer.MAX_VALUE) {
 				throw new IOException("BLOB size exceeds the maximum size (2 GB).");
 			}
@@ -220,8 +228,7 @@ class BlobServerConnection extends Thread {
 			int blobLen = (int) blobFile.length();
 			writeLength(blobLen, outputStream);
 
-			FileInputStream fis = new FileInputStream(blobFile);
-			try {
+			try (FileInputStream fis = new FileInputStream(blobFile)) {
 				int bytesRemaining = blobLen;
 				while (bytesRemaining > 0) {
 					int read = fis.read(buf);
@@ -231,8 +238,6 @@ class BlobServerConnection extends Thread {
 					outputStream.write(buf, 0, read);
 					bytesRemaining -= read;
 				}
-			} finally {
-				fis.close();
 			}
 		}
 		catch (SocketException e) {
@@ -314,6 +319,9 @@ class BlobServerConnection extends Thread {
 				File storageFile = this.blobServer.getStorageLocation(jobID, key);
 				Files.move(incomingFile, storageFile);
 				incomingFile = null;
+
+				blobServer.getBlobStore().put(storageFile, jobID, key);
+
 				outputStream.write(RETURN_OKAY);
 			}
 			else {
@@ -322,6 +330,8 @@ class BlobServerConnection extends Thread {
 				Files.move(incomingFile, storageFile);
 				incomingFile = null;
 
+				blobServer.getBlobStore().put(storageFile, blobKey);
+
 				// Return computed key to client for validation
 				outputStream.write(RETURN_OKAY);
 				blobKey.writeToOutputStream(outputStream);
@@ -379,6 +389,8 @@ class BlobServerConnection extends Thread {
 				if (blobFile.exists() && !blobFile.delete()) {
 					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
 				}
+
+				blobServer.getBlobStore().delete(key);
 			}
 			else if (type == NAME_ADDRESSABLE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
@@ -391,6 +403,8 @@ class BlobServerConnection extends Thread {
 				if (blobFile.exists() && !blobFile.delete()) {
 					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
 				}
+
+				blobServer.getBlobStore().delete(jobID, key);
 			}
 			else if (type == JOB_ID_SCOPE) {
 				byte[] jidBytes = new byte[JobID.SIZE];
@@ -398,6 +412,8 @@ class BlobServerConnection extends Thread {
 				JobID jobID = JobID.fromByteArray(jidBytes);
 
 				blobServer.deleteJobDirectory(jobID);
+
+				blobServer.getBlobStore().deleteAll(jobID);
 			}
 			else {
 				throw new IOException("Unrecognized addressing type: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
new file mode 100644
index 0000000..1e72d91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+
+/**
+ * A blob store.
+ */
+interface BlobStore {
+
+	/**
+	 * Copies the local file to the blob store.
+	 *
+	 * @param localFile The file to copy
+	 * @param blobKey   The ID for the file in the blob store
+	 * @throws Exception If the copy fails
+	 */
+	void put(File localFile, BlobKey blobKey) throws Exception;
+
+	/**
+	 * Copies a local file to the blob store.
+	 *
+	 * <p>The job ID and key make up a composite key for the file.
+	 *
+	 * @param localFile The file to copy
+	 * @param jobId     The JobID part of ID for the file in the blob store
+	 * @param key       The String part of ID for the file in the blob store
+	 * @throws Exception If the copy fails
+	 */
+	void put(File localFile, JobID jobId, String key) throws Exception;
+
+	/**
+	 * Copies a blob to a local file.
+	 *
+	 * @param blobKey   The blob ID
+	 * @param localFile The local file to copy to
+	 * @throws Exception If the copy fails
+	 */
+	void get(BlobKey blobKey, File localFile) throws Exception;
+
+	/**
+	 * Copies a blob to a local file.
+	 *
+	 * @param jobId     The JobID part of ID for the blob
+	 * @param key       The String part of ID for the blob
+	 * @param localFile The local file to copy to
+	 * @throws Exception If the copy fails
+	 */
+	void get(JobID jobId, String key, File localFile) throws Exception;
+
+	/**
+	 * Deletes a blob.
+	 *
+	 * @param blobKey The blob ID
+	 */
+	void delete(BlobKey blobKey);
+
+	/**
+	 * Deletes a blob.
+	 *
+	 * @param jobId The JobID part of ID for the blob
+	 * @param key   The String part of ID for the blob
+	 */
+	void delete(JobID jobId, String key);
+
+	/**
+	 * Deletes blobs.
+	 *
+	 * @param jobId The JobID part of all blobs to delete
+	 */
+	void deleteAll(JobID jobId);
+
+	/**
+	 * Cleans up the store and deletes all blobs.
+	 */
+	void cleanUp();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index c47ecf2..d8f744b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.blob;
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.IOUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.net.URI;
 import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -49,12 +54,12 @@ public class BlobUtils {
 	/**
 	 * The prefix of all BLOB files stored by the BLOB server.
 	 */
-	private static final String BLOB_FILE_PREFIX = "blob_";
+	static final String BLOB_FILE_PREFIX = "blob_";
 
 	/**
 	 * The prefix of all job-specific directories created by the BLOB server.
 	 */
-	private static final String JOB_DIR_PREFIX = "job_";
+	static final String JOB_DIR_PREFIX = "job_";
 
 	/**
 	 * The default character set to translate between characters and bytes.
@@ -103,7 +108,7 @@ public class BlobUtils {
 	static File getIncomingDirectory(File storageDir) {
 		final File incomingDir = new File(storageDir, "incoming");
 
-		if (!incomingDir.exists() && !incomingDir.mkdir()) {
+		if (!incomingDir.exists() && !incomingDir.mkdirs()) {
 			throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath());
 		}
 
@@ -119,7 +124,7 @@ public class BlobUtils {
 	private static File getCacheDirectory(File storageDir) {
 		final File cacheDirectory = new File(storageDir, "cache");
 
-		if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) {
+		if (!cacheDirectory.exists() && !cacheDirectory.mkdirs()) {
 			throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath()
+ "'.");
 		}
 
@@ -174,7 +179,7 @@ public class BlobUtils {
 	 *        the user's key for a BLOB
 	 * @return the internal name for the BLOB as used by the BLOB server
 	 */
-	private static String encodeKey(String key) {
+	static String encodeKey(String key) {
 		return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
 	}
 
@@ -327,6 +332,66 @@ public class BlobUtils {
 	}
 
 	/**
+	 * Returns the path for the given blob key.
+	 *
+	 * <p>The returned path can be used with the state backend for recovery purposes.
+	 *
+	 * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}.
+	 */
+	static String getRecoveryPath(String basePath, BlobKey blobKey) {
+		// format: $base/cache/blob_$key
+		return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX + blobKey.toString());
+	}
+
+	/**
+	 * Returns the path for the given job ID and key.
+	 *
+	 * <p>The returned path can be used with the state backend for recovery purposes.
+	 *
+	 * <p>This follows the same scheme as {@link #getStorageLocation(File, JobID, String)}.
+	 */
+	static String getRecoveryPath(String basePath, JobID jobId, String key) {
+		// format: $base/job_$id/blob_$key
+		return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString(),
+				BLOB_FILE_PREFIX + encodeKey(key));
+	}
+
+	/**
+	 * Returns the path for the given job ID.
+	 *
+	 * <p>The returned path can be used with the state backend for recovery purposes.
+	 */
+	static String getRecoveryPath(String basePath, JobID jobId) {
+		return String.format("%s/%s", basePath, JOB_DIR_PREFIX + jobId.toString());
+	}
+
+	/**
+	 * Copies the file from the recovery path to the local file.
+	 */
+	static void copyFromRecoveryPath(String recoveryPath, File localBlobFile) throws Exception
{
+		if (recoveryPath == null) {
+			throw new IllegalStateException("Failed to determine recovery path.");
+		}
+
+		if (!localBlobFile.createNewFile()) {
+			throw new IllegalStateException("Failed to create new local file to copy to");
+		}
+
+		URI uri = new URI(recoveryPath);
+		Path path = new Path(recoveryPath);
+
+		if (FileSystem.get(uri).exists(path)) {
+			try (InputStream is = FileSystem.get(uri).open(path)) {
+				FileOutputStream fos = new FileOutputStream(localBlobFile);
+				IOUtils.copyBytes(is, fos); // closes the streams
+			}
+		}
+		else {
+			throw new IOException("Cannot find required BLOB at '" + recoveryPath + "' for recovery.");
+		}
+	}
+
+	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private BlobUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
new file mode 100644
index 0000000..8a037ad
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import com.google.common.io.Files;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Blob store backed by {@link FileSystem}.
+ */
+class FileSystemBlobStore implements BlobStore {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);
+
+	/** The base path of the blob store */
+	private final String basePath;
+
+	FileSystemBlobStore(Configuration config) throws IOException {
+		StateBackend stateBackend = StateBackend.fromConfig(config);
+
+		if (stateBackend == StateBackend.FILESYSTEM) {
+			String stateBackendBasePath = config.getString(
+					ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+			if (stateBackendBasePath.equals("")) {
+				throw new IllegalConfigurationException(String.format("Missing configuration for " +
+						"file system state backend recovery path. Please specify via " +
+						"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+			}
+
+			stateBackendBasePath += "/blob";
+
+			this.basePath = stateBackendBasePath;
+
+			try {
+				FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath));
+			}
+			catch (URISyntaxException e) {
+				throw new IOException(e);
+			}
+
+			LOG.info("Created blob directory {}.", basePath);
+		}
+		else {
+			// Nothing else support at the moment
+			throw new IllegalConfigurationException(
+					String.format("Illegal state backend " +
+									"configuration '%s'. Please configure '%s' as state " +
+									"backend and specify the recovery path via '%s' key.",
+							stateBackend, StateBackend.FILESYSTEM,
+							ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+		}
+	}
+
+	// - Put ------------------------------------------------------------------
+
+	@Override
+	public void put(File localFile, BlobKey blobKey) throws Exception {
+		put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
+	}
+
+	@Override
+	public void put(File localFile, JobID jobId, String key) throws Exception {
+		put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
+	}
+
+	private void put(File fromFile, String toBlobPath) throws Exception {
+		try (OutputStream os = FileSystem.get(new URI(toBlobPath))
+				.create(new Path(toBlobPath), true)) {
+
+			LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
+			Files.copy(fromFile, os);
+		}
+	}
+
+	// - Get ------------------------------------------------------------------
+
+	@Override
+	public void get(BlobKey blobKey, File localFile) throws Exception {
+		get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
+	}
+
+	@Override
+	public void get(JobID jobId, String key, File localFile) throws Exception {
+		get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
+	}
+
+	private void get(String fromBlobPath, File toFile) throws Exception {
+		checkNotNull(fromBlobPath, "Blob path");
+		checkNotNull(toFile, "File");
+
+		if (!toFile.exists() && !toFile.createNewFile()) {
+			throw new IllegalStateException("Failed to create target file to copy to");
+		}
+
+		final URI fromUri = new URI(fromBlobPath);
+		final Path fromPath = new Path(fromBlobPath);
+
+		if (FileSystem.get(fromUri).exists(fromPath)) {
+			try (InputStream is = FileSystem.get(fromUri).open(fromPath)) {
+				FileOutputStream fos = new FileOutputStream(toFile);
+
+				LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
+				IOUtils.copyBytes(is, fos); // closes the streams
+			}
+		}
+		else {
+			throw new IOException(fromBlobPath + " does not exist.");
+		}
+	}
+
+	// - Delete ---------------------------------------------------------------
+
+	@Override
+	public void delete(BlobKey blobKey) {
+		delete(BlobUtils.getRecoveryPath(basePath, blobKey));
+	}
+
+	@Override
+	public void delete(JobID jobId, String key) {
+		delete(BlobUtils.getRecoveryPath(basePath, jobId, key));
+	}
+
+	@Override
+	public void deleteAll(JobID jobId) {
+		delete(BlobUtils.getRecoveryPath(basePath, jobId));
+	}
+
+	private void delete(String blobPath) {
+		try {
+			LOG.debug("Deleting {}.", blobPath);
+
+			FileSystem.get(new URI(blobPath)).delete(new Path(blobPath), true);
+		}
+		catch (Exception e) {
+			LOG.warn("Failed to delete blob at " + blobPath);
+		}
+	}
+
+	@Override
+	public void cleanUp() {
+		try {
+			LOG.debug("Cleaning up {}.", basePath);
+
+			FileSystem.get(new URI(basePath)).delete(new Path(basePath), true);
+		}
+		catch (Exception e) {
+			LOG.error("Failed to clean up recovery directory.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
new file mode 100644
index 0000000..1b71add
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+
+/**
+ * A blob store doing nothing.
+ */
+class VoidBlobStore implements BlobStore {
+
+	@Override
+	public void put(File localFile, BlobKey blobKey) throws Exception {
+	}
+
+	@Override
+	public void put(File localFile, JobID jobId, String key) throws Exception {
+	}
+
+	@Override
+	public void get(BlobKey blobKey, File localFile) throws Exception {
+	}
+
+	@Override
+	public void get(JobID jobId, String key, File localFile) throws Exception {
+	}
+
+	@Override
+	public void delete(BlobKey blobKey) {
+	}
+
+	@Override
+	public void delete(JobID jobId, String key) {
+	}
+
+	@Override
+	public void deleteAll(JobID jobId) {
+	}
+
+	@Override
+	public void cleanUp() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 17322d8..077e34d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -24,11 +24,11 @@ import org.apache.flink.configuration.Configuration;
 /**
  * Recovery mode for Flink's cluster execution. Currently supported modes are:
  *
- *   - Standalone: No recovery from JobManager failures
- *   - ZooKeeper: JobManager high availability via ZooKeeper
- *     ZooKeeper is used to select a leader among a group of JobManager. This JobManager
- *     is responsible for the job execution. Upon failure of the leader a new leader is elected
- *     which will take over the responsibilities of the old leader
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
+ * is responsible for the job execution. Upon failure of the leader a new leader is elected
+ * which will take over the responsibilities of the old leader
  */
 public enum RecoveryMode {
 	STANDALONE,
@@ -69,4 +69,4 @@ public enum RecoveryMode {
 				return false;
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
new file mode 100644
index 0000000..0e324a8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobRecoveryITCase {
+
+	private File recoveryDir;
+
+	@Before
+	public void setUp() throws Exception {
+		recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
+		if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+			throw new IllegalStateException("Failed to create temp directory for test");
+		}
+	}
+
+	@After
+	public void cleanUp() throws Exception {
+		if (recoveryDir != null) {
+			FileUtils.deleteDirectory(recoveryDir);
+		}
+	}
+
+	/**
+	 * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from
any
+	 * participating BlobServer.
+	 */
+	@Test
+	public void testBlobServerRecovery() throws Exception {
+		Random rand = new Random();
+
+		BlobServer[] server = new BlobServer[2];
+		InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+			config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
+
+			for (int i = 0; i < server.length; i++) {
+				server[i] = new BlobServer(config);
+				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
+			}
+
+			client = new BlobClient(serverAddress[0]);
+
+			// Random data
+			byte[] expected = new byte[1024];
+			rand.nextBytes(expected);
+
+			BlobKey[] keys = new BlobKey[2];
+
+			// Put data
+			keys[0] = client.put(expected); // Request 1
+			keys[1] = client.put(expected, 32, 256); // Request 2
+
+			JobID[] jobId = new JobID[] { new JobID(), new JobID() };
+			String[] testKey = new String[] { "test-key-1", "test-key-2" };
+
+			client.put(jobId[0], testKey[0], expected); // Request 3
+			client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4
+
+			// Close the client and connect to the other server
+			client.close();
+			client = new BlobClient(serverAddress[1]);
+
+			// Verify request 1
+			try (InputStream is = client.get(keys[0])) {
+				byte[] actual = new byte[expected.length];
+
+				BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+				for (int i = 0; i < expected.length; i++) {
+					assertEquals(expected[i], actual[i]);
+				}
+			}
+
+			// Verify request 2
+			try (InputStream is = client.get(keys[1])) {
+				byte[] actual = new byte[256];
+				BlobUtils.readFully(is, actual, 0, 256, null);
+
+				for (int i = 32, j = 0; i < 256; i++, j++) {
+					assertEquals(expected[i], actual[j]);
+				}
+			}
+
+			// Verify request 3
+			try (InputStream is = client.get(jobId[0], testKey[0])) {
+				byte[] actual = new byte[expected.length];
+				BlobUtils.readFully(is, actual, 0, expected.length, null);
+
+				for (int i = 0; i < expected.length; i++) {
+					assertEquals(expected[i], actual[i]);
+				}
+			}
+
+			// Verify request 4
+			try (InputStream is = client.get(jobId[1], testKey[1])) {
+				byte[] actual = new byte[256];
+				BlobUtils.readFully(is, actual, 0, 256, null);
+
+				for (int i = 32, j = 0; i < 256; i++, j++) {
+					assertEquals(expected[i], actual[j]);
+				}
+			}
+		}
+		finally {
+			for (BlobServer s : server) {
+				if (s != null) {
+					s.shutdown();
+				}
+			}
+
+			if (client != null) {
+				client.close();
+			}
+		}
+
+		// Verify everything is clean
+		File[] recoveryFiles = recoveryDir.listFiles();
+		assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
new file mode 100644
index 0000000..4df8afb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobLibraryCacheRecoveryITCase {
+
+	private File recoveryDir;
+
+	@Before
+	public void setUp() throws Exception {
+		recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
+		if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+			throw new IllegalStateException("Failed to create temp directory for test");
+		}
+	}
+
+	@After
+	public void cleanUp() throws Exception {
+		if (recoveryDir != null) {
+			FileUtils.deleteDirectory(recoveryDir);
+		}
+	}
+
+	/**
+	 * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from
any
+	 * participating BlobLibraryCacheManager.
+	 */
+	@Test
+	public void testRecoveryRegisterAndDownload() throws Exception {
+		Random rand = new Random();
+
+		BlobServer[] server = new BlobServer[2];
+		InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+		BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
+		BlobCache cache = null;
+		BlobLibraryCacheManager libCache = null;
+
+		try {
+			Configuration config = new Configuration();
+			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+			config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
+
+			for (int i = 0; i < server.length; i++) {
+				server[i] = new BlobServer(config);
+				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
+				libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000);
+			}
+
+			// Random data
+			byte[] expected = new byte[1024];
+			rand.nextBytes(expected);
+
+			List<BlobKey> keys = new ArrayList<>(2);
+
+			// Upload some data (libraries)
+			try (BlobClient client = new BlobClient(serverAddress[0])) {
+				keys.add(client.put(expected)); // Request 1
+				keys.add(client.put(expected, 32, 256)); // Request 2
+			}
+
+			// The cache
+			cache = new BlobCache(serverAddress[0], config);
+			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
+
+			// Register uploaded libraries
+			JobID jobId = new JobID();
+			ExecutionAttemptID executionId = new ExecutionAttemptID();
+			libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());
+
+			// Verify key 1
+			File f = libCache.getFile(keys.get(0));
+			assertEquals(expected.length, f.length());
+
+			try (FileInputStream fis = new FileInputStream(f)) {
+				for (int i = 0; i < expected.length && fis.available() > 0; i++) {
+					assertEquals(expected[i], (byte) fis.read());
+				}
+
+				assertEquals(0, fis.available());
+			}
+
+			// Shutdown cache and start with other server
+			cache.shutdown();
+			libCache.shutdown();
+
+			cache = new BlobCache(serverAddress[1], config);
+			libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
+
+			// Verify key 1
+			f = libCache.getFile(keys.get(0));
+			assertEquals(expected.length, f.length());
+
+			try (FileInputStream fis = new FileInputStream(f)) {
+				for (int i = 0; i < expected.length && fis.available() > 0; i++) {
+					assertEquals(expected[i], (byte) fis.read());
+				}
+
+				assertEquals(0, fis.available());
+			}
+
+			// Verify key 2
+			f = libCache.getFile(keys.get(1));
+			assertEquals(256, f.length());
+
+			try (FileInputStream fis = new FileInputStream(f)) {
+				for (int i = 0; i < 256 && fis.available() > 0; i++) {
+					assertEquals(expected[32 + i], (byte) fis.read());
+				}
+
+				assertEquals(0, fis.available());
+			}
+		}
+		finally {
+			for (BlobServer s : server) {
+				if (s != null) {
+					s.shutdown();
+				}
+			}
+
+			if (cache != null) {
+				cache.shutdown();
+			}
+
+			if (libCache != null) {
+				libCache.shutdown();
+			}
+		}
+
+		// Verify everything is clean
+		File[] recoveryFiles = recoveryDir.listFiles();
+		assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
+	}
+}


Mime
View raw message