flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-1578] [BLOB manager] Improve failure handling and add more failure tests.
Date Fri, 20 Feb 2015 12:26:45 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
new file mode 100644
index 0000000..0946d98
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.security.MessageDigest;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
+import static org.apache.flink.runtime.blob.BlobUtils.readFully;
+import static org.apache.flink.runtime.blob.BlobUtils.readLength;
+import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+
+/**
+ * A BLOB connection handles a series of requests from a particular BLOB client.
+ */
+class BlobServerConnection extends Thread {
+
+	/** The log object used for debugging. */
+	private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
+
+	/** The socket to communicate with the client. */
+	private final Socket clientSocket;
+
+	/** The BLOB server. */
+	private final BlobServer blobServer;
+
+	/**
+	 * Creates a new BLOB connection for a client request
+	 * 
+	 * @param clientSocket The socket to read/write data.
+	 * @param blobServer The BLOB server.
+	 */
+	BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
+		super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString());
+		setDaemon(true);
+
+		if (blobServer == null) {
+			throw new NullPointerException();
+		}
+
+		this.clientSocket = clientSocket;
+		this.blobServer = blobServer;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Connection / Thread methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Main connection work method. Accepts requests until the other side closes the connection.
+	 */
+	@Override
+	public void run() {
+		try {
+			final InputStream inputStream = this.clientSocket.getInputStream();
+			final OutputStream outputStream = this.clientSocket.getOutputStream();
+			final byte[] buffer = new byte[BUFFER_SIZE];
+
+			while (true) {
+				// Read the requested operation
+				final int operation = inputStream.read();
+				if (operation < 0) {
+					// done, no one is asking anything from us
+					return;
+				}
+
+				switch (operation) {
+				case PUT_OPERATION:
+					put(inputStream, outputStream, buffer);
+					break;
+				case GET_OPERATION:
+					get(inputStream, outputStream, buffer);
+					break;
+				case DELETE_OPERATION:
+					delete(inputStream, outputStream, buffer);
+					break;
+				default:
+					throw new IOException("Unknown operation " + operation);
+				}
+			}
+		}
+		catch (SocketException e) {
+			// this happens when the remote site closes the connection
+			LOG.debug("Socket connection closed", e);
+		}
+		catch (Throwable t) {
+			LOG.error("Error while executing BLOB connection.", t);
+		}
+		finally {
+			try {
+				if (clientSocket != null) {
+					clientSocket.close();
+				}
+			} catch (Throwable t) {
+				LOG.debug("Exception while closing BLOB server connection socket.", t);
+			}
+
+			blobServer.unregisterConnection(this);
+		}
+	}
+
+	/**
+	 * Closes the connection socket and lets the thread exit.
+	 */
+	public void close() {
+		closeSilently(clientSocket, LOG);
+		interrupt();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Actions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Handles an incoming GET request from a BLOB client.
+	 * 
+	 * @param inputStream
+	 *        the input stream to read incoming data from
+	 * @param outputStream
+	 *        the output stream to send data back to the client
+	 * @param buf
+	 *        an auxiliary buffer for data serialization/deserialization
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while reading/writing data from/to the respective streams
+	 */
+	private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
+
+		File blobFile;
+		try {
+			final int contentAdressable = inputStream.read();
+
+			if (contentAdressable < 0) {
+				throw new EOFException("Premature end of GET request");
+			}
+			if (contentAdressable == NAME_ADDRESSABLE) {
+				// Receive the job ID and key
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+
+				JobID jobID = JobID.fromByteArray(jidBytes);
+				String key = readKey(buf, inputStream);
+				blobFile = this.blobServer.getStorageLocation(jobID, key);
+			}
+			else if (contentAdressable == CONTENT_ADDRESSABLE) {
+				final BlobKey key = BlobKey.readFromInputStream(inputStream);
+				blobFile = blobServer.getStorageLocation(key);
+			}
+			else {
+				throw new IOException("Unknown type of BLOB addressing.");
+			}
+
+			// Check if BLOB exists
+			if (!blobFile.exists()) {
+				throw new IOException("Cannot find required BLOB at " + blobFile.getAbsolutePath());
+			}
+			if (blobFile.length() > Integer.MAX_VALUE) {
+				throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+			}
+
+			outputStream.write(RETURN_OKAY);
+
+			// up to here, an error can give a good message
+		}
+		catch (Throwable t) {
+			LOG.error("GET operation failed", t);
+			try {
+				writeErrorToStream(outputStream, t);
+			}
+			catch (IOException e) {
+				// since we are in an exception case, it means not much that we could not send the error
+				// ignore this
+			}
+			clientSocket.close();
+			return;
+		}
+
+		// from here on, we started sending data, so all we can do is close the connection when something happens
+		try {
+			int blobLen = (int) blobFile.length();
+			writeLength(blobLen, outputStream);
+
+			FileInputStream fis = new FileInputStream(blobFile);
+			try {
+				int bytesRemaining = blobLen;
+				while (bytesRemaining > 0) {
+					int read = fis.read(buf);
+					if (read < 0) {
+						throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath());
+					}
+					outputStream.write(buf, 0, read);
+					bytesRemaining -= read;
+				}
+			} finally {
+				fis.close();
+			}
+		}
+		catch (SocketException e) {
+			// happens when the other side disconnects
+			LOG.debug("Socket connection closed", e);
+		}
+		catch (Throwable t) {
+			LOG.error("GET operation failed", t);
+			clientSocket.close();
+		}
+	}
+
+	/**
+	 * Handles an incoming PUT request from a BLOB client.
+	 * 
+	 * @param inputStream The input stream to read incoming data from.
+	 * @param outputStream The output stream to send data back to the client.
+	 * @param buf An auxiliary buffer for data serialization/deserialization.
+	 */
+	private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
+		JobID jobID = null;
+		String key = null;
+		MessageDigest md = null;
+
+		File incomingFile = null;
+		FileOutputStream fos = null;
+
+		try {
+			final int contentAdressable = inputStream.read();
+			if (contentAdressable < 0) {
+				throw new EOFException("Premature end of PUT request");
+			}
+
+			if (contentAdressable == NAME_ADDRESSABLE) {
+				// Receive the job ID and key
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+				jobID = JobID.fromByteArray(jidBytes);
+				key = readKey(buf, inputStream);
+			}
+			else if (contentAdressable == CONTENT_ADDRESSABLE) {
+				md = BlobUtils.createMessageDigest();
+			}
+			else {
+				throw new IOException("Unknown type of BLOB addressing.");
+			}
+
+			if (LOG.isDebugEnabled()) {
+				if (contentAdressable == NAME_ADDRESSABLE) {
+					LOG.debug(String.format("Received PUT request for BLOB under %s / \"%s\"", jobID, key));
+				} else {
+					LOG.debug("Received PUT request for content addressable BLOB");
+				}
+			}
+
+			incomingFile = blobServer.createTemporaryFilename();
+			fos = new FileOutputStream(incomingFile);
+
+			while (true) {
+				final int bytesExpected = readLength(inputStream);
+				if (bytesExpected == -1) {
+					// done
+					break;
+				}
+				if (bytesExpected > BUFFER_SIZE) {
+					throw new IOException("Unexpected number of incoming bytes: " + bytesExpected);
+				}
+
+				readFully(inputStream, buf, 0, bytesExpected, "buffer");
+				fos.write(buf, 0, bytesExpected);
+
+				if (md != null) {
+					md.update(buf, 0, bytesExpected);
+				}
+			}
+
+			fos.close();
+			fos = null;
+
+			if (contentAdressable == NAME_ADDRESSABLE) {
+				File storageFile = this.blobServer.getStorageLocation(jobID, key);
+				if (!incomingFile.renameTo(storageFile)) {
+					throw new IOException(String.format("Cannot move staging file %s to BLOB file %s",
+							incomingFile.getAbsolutePath(), storageFile.getAbsolutePath()));
+				}
+				incomingFile = null;
+				outputStream.write(RETURN_OKAY);
+			}
+			else {
+				BlobKey blobKey = new BlobKey(md.digest());
+				File storageFile = blobServer.getStorageLocation(blobKey);
+				if (!incomingFile.renameTo(storageFile)) {
+					throw new IOException(String.format("Cannot move staging file %s to BLOB file %s",
+							incomingFile.getAbsolutePath(), storageFile.getAbsolutePath()));
+				}
+				incomingFile = null;
+
+				// Return computed key to client for validation
+				outputStream.write(RETURN_OKAY);
+				blobKey.writeToOutputStream(outputStream);
+			}
+		}
+		catch (SocketException e) {
+			// happens when the other side disconnects
+			LOG.debug("Socket connection closed", e);
+		}
+		catch (Throwable t) {
+			LOG.error("PUT operation failed", t);
+			try {
+				writeErrorToStream(outputStream, t);
+			}
+			catch (IOException e) {
+				// since we are in an exception case, it means not much that we could not send the error
+				// ignore this
+			}
+			clientSocket.close();
+		}
+		finally {
+			if (fos != null) {
+				try {
+					fos.close();
+				} catch (Throwable t) {
+					LOG.warn("Cannot close stream to BLOB staging file", t);
+				}
+			}
+			if (incomingFile != null) {
+				if (!incomingFile.delete()) {
+					LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
+				}
+			}
+		}
+	}
+
+	/**
+	 * Handles an incoming DELETE request from a BLOB client.
+	 * 
+	 * @param inputStream The input stream to read the request from.
+	 * @param outputStream The output stream to write the response to.
+	 * @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream.
+	 */
+	private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {
+
+		try {
+			int type = inputStream.read();
+			if (type < 0) {
+				throw new EOFException("Premature end of DELETE request");
+			}
+
+			if (type == CONTENT_ADDRESSABLE) {
+				BlobKey key = BlobKey.readFromInputStream(inputStream);
+				File blobFile = this.blobServer.getStorageLocation(key);
+				if (blobFile.exists() && !blobFile.delete()) {
+					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+				}
+			}
+			else if (type == NAME_ADDRESSABLE) {
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+				JobID jobID = JobID.fromByteArray(jidBytes);
+
+				String key = readKey(buf, inputStream);
+
+				File blobFile = this.blobServer.getStorageLocation(jobID, key);
+				if (blobFile.exists() && !blobFile.delete()) {
+					throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
+				}
+			}
+			else if (type == JOB_ID_SCOPE) {
+				byte[] jidBytes = new byte[JobID.SIZE];
+				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
+				JobID jobID = JobID.fromByteArray(jidBytes);
+
+				blobServer.deleteJobDirectory(jobID);
+			}
+			else {
+				throw new IOException("Unrecognized addressing type: " + type);
+			}
+
+			outputStream.write(RETURN_OKAY);
+		}
+		catch (Throwable t) {
+			LOG.error("DELETE operation failed", t);
+			try {
+				writeErrorToStream(outputStream, t);
+			}
+			catch (IOException e) {
+				// since we are in an exception case, it means not much that we could not send the error
+				// ignore this
+			}
+			clientSocket.close();
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Reads the key of a BLOB from the given input stream.
+	 * 
+	 * @param buf
+	 *        auxiliary buffer to data deserialization
+	 * @param inputStream
+	 *        the input stream to read the key from
+	 * @return the key of a BLOB
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while reading the key data from the input stream
+	 */
+	private static String readKey(byte[] buf, InputStream inputStream) throws IOException {
+		final int keyLength = readLength(inputStream);
+		if (keyLength > MAX_KEY_LENGTH) {
+			throw new IOException("Unexpected key length " + keyLength);
+		}
+
+		readFully(inputStream, buf, 0, keyLength, "BlobKey");
+		return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET);
+	}
+
+	/**
+	 * Writes to the output stream the error return code, and the given exception in serialized form.
+	 *
+	 * @param out Thr output stream to write to.
+	 * @param t The exception to send.
+	 * @throws IOException Thrown, if the output stream could not be written to.
+	 */
+	private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException {
+		byte[] bytes = InstantiationUtil.serializeObject(t);
+		out.write(RETURN_ERROR);
+		writeLength(bytes.length, out);
+		out.write(bytes);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
new file mode 100644
index 0000000..6df7811
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.blob;
+
+public class BlobServerProtocol {
+
+	// --------------------------------------------------------------------------------------------
+	//  Constants used in the protocol of the BLOB store
+	// --------------------------------------------------------------------------------------------
+
+	/** The buffer size in bytes for network transfers. */
+	static final int BUFFER_SIZE = 65536; // 64 K
+
+	/** The maximum key length allowed for storing BLOBs. */
+	static final int MAX_KEY_LENGTH = 64;
+
+	/** Internal code to identify a PUT operation. */
+	static final byte PUT_OPERATION = 0;
+
+	/** Internal code to identify a GET operation. */
+	static final byte GET_OPERATION = 1;
+
+	/** Internal code to identify a DELETE operation. */
+	static final byte DELETE_OPERATION = 2;
+
+	/** Internal code to identify a successful operation. */
+	static final byte RETURN_OKAY = 0;
+
+	/** Internal code to identify an erroneous operation. */
+	static final byte RETURN_ERROR = 1;
+
+	/** Internal code to identify a reference via content hash as the key */
+	static final byte CONTENT_ADDRESSABLE = 0;
+
+	/** Internal code to identify a reference via jobId and name as the key */
+	static final byte NAME_ADDRESSABLE = 1;
+
+	/** Internal code to identify a reference via jobId as the key */
+	static final byte JOB_ID_SCOPE = 2;
+
+	// --------------------------------------------------------------------------------------------
+
+	private BlobServerProtocol() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 148476f..a2400b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -21,27 +21,32 @@ package org.apache.flink.runtime.blob;
 import java.io.IOException;
 import java.net.URL;
 
+/**
+ * A simple store and retrieve binary large objects (BLOBs).
+ */
 public interface BlobService {
+
 	/**
 	 * This method returns the URL of the file associated with the provided blob key.
 	 *
-	 * @param requiredBlob blob key associated with the requested file
-	 * @return URL of the file
+	 * @param key blob key associated with the requested file
+	 * @return The URL to the file.
 	 * @throws IOException
 	 */
-	URL getURL(final BlobKey requiredBlob) throws IOException;
+	URL getURL(BlobKey key) throws IOException;
+
 
 	/**
 	 * This method deletes the file associated with the provided blob key.
 	 *
-	 * @param blobKey associated with the file to be deleted
+	 * @param key associated with the file to be deleted
 	 * @throws IOException
 	 */
-	void delete(final BlobKey blobKey) throws IOException;
+	void delete(BlobKey key) throws IOException;
 
 	/**
-	 * Returns the port of the blob service
-	 * @return the port of the blob service
+	 * Returns the port of the blob service.
+	 * @return the port of the blob service.
 	 */
 	int getPort();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 53cab1c..5db5ef6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,8 +23,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.slf4j.Logger;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
 import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -90,10 +94,13 @@ public class BlobUtils {
 	 * @return the BLOB server's directory for incoming files
 	 */
 	static File getIncomingDirectory(File storageDir) {
-		final File incomingDirectory = new File(storageDir, "incoming");
-		incomingDirectory.mkdir();
+		final File incomingDir = new File(storageDir, "incoming");
 
-		return incomingDirectory;
+		if (!incomingDir.exists() && !incomingDir.mkdir()) {
+			throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath());
+		}
+
+		return incomingDir;
 	}
 
 	/**
@@ -106,7 +113,7 @@ public class BlobUtils {
 		final File cacheDirectory = new File(storageDir, "cache");
 
 		if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) {
-			throw new RuntimeException("Could not create cache directory '" + cacheDirectory + "'.");
+			throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'.");
 		}
 
 		return cacheDirectory;
@@ -119,7 +126,7 @@ public class BlobUtils {
 	 *        the key identifying the BLOB
 	 * @return the (designated) physical storage location of the BLOB
 	 */
-	static File getStorageLocation(final File storageDir,  final BlobKey key) {
+	static File getStorageLocation(File storageDir, BlobKey key) {
 		return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString());
 	}
 
@@ -132,7 +139,7 @@ public class BlobUtils {
 	 *        the key of the BLOB
 	 * @return the (designated) physical storage location of the BLOB with the given job ID and key
 	 */
-	static File getStorageLocation(final File storageDir, final JobID jobID, final String key) {
+	static File getStorageLocation(File storageDir, JobID jobID, String key) {
 		return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key));
 	}
 
@@ -143,9 +150,12 @@ public class BlobUtils {
 	 *        the ID of the job to return the storage directory for
 	 * @return the storage directory for BLOBs belonging to the job with the given ID
 	 */
-	private static File getJobDirectory(final File storageDir, final JobID jobID){
+	private static File getJobDirectory(File storageDir, JobID jobID) {
 		final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + jobID.toString());
-		jobDirectory.mkdirs();
+
+		if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
+			throw new RuntimeException("Could not create jobId directory '" + jobDirectory.getAbsolutePath() + "'.");
+		}
 
 		return jobDirectory;
 	}
@@ -157,8 +167,7 @@ public class BlobUtils {
 	 *        the user's key for a BLOB
 	 * @return the internal name for the BLOB as used by the BLOB server
 	 */
-	private static String encodeKey(final String key) {
-
+	private static String encodeKey(String key) {
 		return BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
 	}
 
@@ -168,9 +177,8 @@ public class BlobUtils {
 	 * @param jobID
 	 *			jobID whose directory shall be deleted
 	 */
-	static void deleteJobDirectory(final File storageDir, final JobID jobID) throws IOException {
+	static void deleteJobDirectory(File storageDir, JobID jobID) throws IOException {
 		File directory = getJobDirectory(storageDir, jobID);
-
 		FileUtils.deleteDirectory(directory);
 	}
 
@@ -220,4 +228,94 @@ public class BlobUtils {
 			return null;
 		}
 	}
+
+	/**
+	 * Auxiliary method to write the length of an upcoming data chunk to an
+	 * output stream.
+	 *
+	 * @param length
+	 *        the length of the upcoming data chunk in bytes
+	 * @param outputStream
+	 *        the output stream to write the length to
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while writing to the output
+	 *         stream
+	 */
+	static void writeLength(int length, OutputStream outputStream) throws IOException {
+		byte[] buf = new byte[4];
+		buf[0] = (byte) (length & 0xff);
+		buf[1] = (byte) ((length >> 8) & 0xff);
+		buf[2] = (byte) ((length >> 16) & 0xff);
+		buf[3] = (byte) ((length >> 24) & 0xff);
+		outputStream.write(buf, 0, 4);
+	}
+
+	/**
+	 * Auxiliary method to read the length of an upcoming data chunk from an
+	 * input stream.
+	 *
+	 * @param inputStream
+	 *        the input stream to read the length from
+	 * @return the length of the upcoming data chunk in bytes
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while reading from the input
+	 *         stream
+	 */
+	static int readLength(InputStream inputStream) throws IOException {
+		byte[] buf = new byte[4];
+		int bytesRead = 0;
+		while (bytesRead < 4) {
+			final int read = inputStream.read(buf, bytesRead, 4 - bytesRead);
+			if (read < 0) {
+				throw new EOFException("Read an incomplete length");
+			}
+			bytesRead += read;
+		}
+
+		bytesRead = buf[0] & 0xff;
+		bytesRead |= (buf[1] & 0xff) << 8;
+		bytesRead |= (buf[2] & 0xff) << 16;
+		bytesRead |= (buf[3] & 0xff) << 24;
+
+		return bytesRead;
+	}
+
+	/**
+	 * Auxiliary method to read a particular number of bytes from an input stream. This method blocks until the
+	 * requested number of bytes have been read from the stream. If the stream cannot offer enough data, an
+	 * {@link EOFException} is thrown.
+	 *
+	 * @param inputStream The input stream to read the data from.
+	 * @param buf The buffer to store the read data.
+	 * @param off The offset inside the buffer.
+	 * @param len The number of bytes to read from the stream.
+	 * @param type The name of the type, to throw a good error message in case of not enough data.
+	 * @throws IOException
+	 *         Thrown if I/O error occurs while reading from the stream or the stream cannot offer enough data.
+	 */
+	static void readFully(InputStream inputStream, byte[] buf, int off, int len, String type) throws IOException {
+
+		int bytesRead = 0;
+		while (bytesRead < len) {
+
+			final int read = inputStream.read(buf, off + bytesRead, len
+					- bytesRead);
+			if (read < 0) {
+				throw new EOFException("Received an incomplete " + type);
+			}
+			bytesRead += read;
+		}
+	}
+
+	static void closeSilently(Socket socket, Logger LOG) {
+		if (socket != null) {
+			try {
+				socket.close();
+			} catch (Throwable t) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Error while closing resource after BLOB transfer.", t);
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
index 7f7575b..ba9add5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
@@ -22,10 +22,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 
+import java.nio.ByteBuffer;
+
 /**
  * This class contains tests for the {@link org.apache.flink.runtime.AbstractID} class.
  */
@@ -48,6 +51,45 @@ public class AbstractIDTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testConvertToBytes() {
+		try {
+			AbstractID origID = new AbstractID();
+
+			AbstractID copy1 = new AbstractID(origID);
+			AbstractID copy2 = new AbstractID(origID.getBytes());
+			AbstractID copy3 = new AbstractID(origID.getLowerPart(), origID.getUpperPart());
+
+			assertEquals(origID, copy1);
+			assertEquals(origID, copy2);
+			assertEquals(origID, copy3);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testConvertToByteBuffer() {
+		try {
+			JobID origID = new JobID();
+
+			byte[] bytes = origID.getBytes();
+			ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+			JobID copy1 = JobID.fromByteBuffer(buffer);
+			JobID copy2 = JobID.fromByteArray(bytes);
+
+			assertEquals(origID, copy1);
+			assertEquals(origID, copy2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 	
 	@Test
 	public void testCompare() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
new file mode 100644
index 0000000..aba0aff
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for the blob cache retrying the connection to the server.
+ */
+public class BlobCacheRetriesTest {
+
+	/**
+	 * A test where the connection fails twice and then the get operation succeeds.
+	 */
+	@Test
+	public void testBlobFetchRetries() {
+
+		final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+
+		BlobServer server = null;
+		BlobCache cache = null;
+		try {
+			final Configuration config = new Configuration();
+
+			server = new TestingFailingBlobServer(config, 2);
+
+			final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+
+			// upload some blob
+			BlobClient blobClient = null;
+			BlobKey key;
+			try {
+				blobClient = new BlobClient(serverAddress);
+
+				key = blobClient.put(data);
+			}
+			finally {
+				if (blobClient != null) {
+					blobClient.close();
+				}
+			}
+
+			cache = new BlobCache(serverAddress, config);
+
+			// trigger a download - it should fail on the first time, but retry, and succeed at the second time
+			URL url = cache.getURL(key);
+			InputStream is = url.openStream();
+			try {
+				byte[] received = new byte[data.length];
+				assertEquals(data.length, is.read(received));
+				assertArrayEquals(data, received);
+			}
+			finally {
+				is.close();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (cache != null) {
+				cache.shutdown();
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	/**
+	 * A test where the connection fails too often and eventually fails the GET request.
+	 */
+	@Test
+	public void testBlobFetchWithTooManyFailures() {
+
+		final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+		BlobServer server = null;
+		BlobCache cache = null;
+		try {
+			final Configuration config = new Configuration();
+
+			server = new TestingFailingBlobServer(config, 10);
+
+			final InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+
+			// upload some blob
+			BlobClient blobClient = null;
+			BlobKey key;
+			try {
+				blobClient = new BlobClient(serverAddress);
+
+				key = blobClient.put(data);
+			}
+			finally {
+				if (blobClient != null) {
+					blobClient.close();
+				}
+			}
+
+			cache = new BlobCache(serverAddress, config);
+
+			// trigger a download - it should fail eventually
+			try {
+				cache.getURL(key);
+				fail("This should fail");
+			}
+			catch (IOException e) {
+				// as we expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (cache != null) {
+				cache.shutdown();
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
new file mode 100644
index 0000000..4b92b71
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link BlobCache}.
+ */
+public class BlobCacheSuccessTest {
+
+	@Test
+	public void testBlobCache() {
+
+		// First create two BLOBs and upload them to BLOB server
+		final byte[] buf = new byte[128];
+		final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
+
+		BlobServer blobServer = null;
+		BlobCache blobCache = null;
+		try {
+
+			// Start the BLOB server
+			blobServer = new BlobServer(new Configuration());
+			final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getPort());
+
+			// Upload BLOBs
+			BlobClient blobClient = null;
+			try {
+
+				blobClient = new BlobClient(serverAddress);
+
+				blobKeys.add(blobClient.put(buf));
+				buf[0] = 1; // Make sure the BLOB key changes
+				blobKeys.add(blobClient.put(buf));
+			} finally {
+				if (blobClient != null) {
+					blobClient.close();
+				}
+			}
+
+			blobCache = new BlobCache(serverAddress, new Configuration());
+
+			for(int i = 0; i < blobKeys.size(); i++){
+				blobCache.getURL(blobKeys.get(i));
+			}
+
+			// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
+			blobServer.shutdown();
+			blobServer = null;
+
+			final URL[] urls = new URL[blobKeys.size()];
+
+			for(int i = 0; i < blobKeys.size(); i++){
+				urls[i] = blobCache.getURL(blobKeys.get(i));
+			}
+
+			// Verify the result
+			assertEquals(blobKeys.size(), urls.length);
+
+			for (int i = 0; i < urls.length; ++i) {
+
+				final URL url = urls[i];
+
+				assertNotNull(url);
+
+				try {
+					final File cachedFile = new File(url.toURI());
+
+					assertTrue(cachedFile.exists());
+					assertEquals(buf.length, cachedFile.length());
+
+				} catch (URISyntaxException e) {
+					fail(e.getMessage());
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (blobServer != null) {
+				blobServer.shutdown();
+			}
+
+			if(blobCache != null){
+				blobCache.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
deleted file mode 100644
index 32c8c3a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-/**
- * This class contains unit tests for the {@link BlobCache}.
- */
-public class BlobCacheTest {
-
-	@Test
-	public void testBlobCache() {
-
-		// First create two BLOBs and upload them to BLOB server
-		final byte[] buf = new byte[128];
-		final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
-
-		BlobServer blobServer = null;
-		BlobCache blobCache = null;
-		try {
-
-			// Start the BLOB server
-			blobServer = new BlobServer(new Configuration());
-			final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getServerPort());
-
-			// Upload BLOBs
-			BlobClient blobClient = null;
-			try {
-
-				blobClient = new BlobClient(serverAddress);
-
-				blobKeys.add(blobClient.put(buf));
-				buf[0] = 1; // Make sure the BLOB key changes
-				blobKeys.add(blobClient.put(buf));
-			} finally {
-				if (blobClient != null) {
-					blobClient.close();
-				}
-			}
-
-			blobCache = new BlobCache(serverAddress, new Configuration());
-
-			for(int i = 0; i < blobKeys.size(); i++){
-				blobCache.getURL(blobKeys.get(i));
-			}
-
-			// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
-			blobServer.shutdown();
-			blobServer = null;
-
-			final URL[] urls = new URL[blobKeys.size()];
-
-			for(int i = 0; i < blobKeys.size(); i++){
-				urls[i] = blobCache.getURL(blobKeys.get(i));
-			}
-
-			// Verify the result
-			assertEquals(blobKeys.size(), urls.length);
-
-			for (int i = 0; i < urls.length; ++i) {
-
-				final URL url = urls[i];
-
-				assertNotNull(url);
-
-				try {
-					final File cachedFile = new File(url.toURI());
-
-					assertTrue(cachedFile.exists());
-					assertEquals(buf.length, cachedFile.length());
-
-				} catch (URISyntaxException e) {
-					fail(e.getMessage());
-				}
-
-			}
-
-		} catch (IOException ioe) {
-			fail(ioe.getMessage());
-		} finally {
-			if (blobServer != null) {
-				blobServer.shutdown();
-			}
-
-			if(blobCache != null){
-				blobCache.shutdown();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 1465777..2254d7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.fail;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,7 +32,6 @@ import java.security.MessageDigest;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -43,14 +41,10 @@ import org.junit.Test;
  */
 public class BlobClientTest {
 
-	/**
-	 * The buffer size used during the tests in bytes.
-	 */
+	/** The buffer size used during the tests in bytes. */
 	private static final int TEST_BUFFER_SIZE = 17 * 1000;
 
-	/**
-	 * The instance of the BLOB server used during the tests.
-	 */
+	/** The instance of the BLOB server used during the tests. */
 	private static BlobServer BLOB_SERVER;
 
 	/**
@@ -60,10 +54,11 @@ public class BlobClientTest {
 	public static void startServer() {
 		try {
 			BLOB_SERVER = new BlobServer(new Configuration());
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
 		}
-
+		catch (IOException e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	/**
@@ -82,13 +77,10 @@ public class BlobClientTest {
 	 * @return a test buffer filled with a specific byte pattern
 	 */
 	private static byte[] createTestBuffer() {
-
 		final byte[] buf = new byte[TEST_BUFFER_SIZE];
-
 		for (int i = 0; i < buf.length; ++i) {
 			buf[i] = (byte) (i % 128);
 		}
-
 		return buf;
 	}
 
@@ -102,7 +94,7 @@ public class BlobClientTest {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while writing to the test file
 	 */
-	private static BlobKey prepareTestFile(final File file) throws IOException {
+	private static BlobKey prepareTestFile(File file) throws IOException {
 
 		MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -203,44 +195,44 @@ public class BlobClientTest {
 	@Test
 	public void testContentAddressableBuffer() {
 
-		final byte[] testBuffer = createTestBuffer();
-		final MessageDigest md = BlobUtils.createMessageDigest();
-		md.update(testBuffer);
-		final BlobKey origKey = new BlobKey(md.digest());
+		BlobClient client = null;
 
 		try {
+			byte[] testBuffer = createTestBuffer();
+			MessageDigest md = BlobUtils.createMessageDigest();
+			md.update(testBuffer);
+			BlobKey origKey = new BlobKey(md.digest());
 
-			BlobClient client = null;
-			try {
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
+			client = new BlobClient(serverAddress);
 
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
-				client = new BlobClient(serverAddress);
+			// Store the data
+			BlobKey receivedKey = client.put(testBuffer);
+			assertEquals(origKey, receivedKey);
 
-				// Store the data
-				final BlobKey receivedKey = client.put(testBuffer);
-				assertEquals(origKey, receivedKey);
+			// Retrieve the data
+			InputStream is = client.get(receivedKey);
+			validateGet(is, testBuffer);
 
-				// Retrieve the data
-				final InputStream is = client.get(receivedKey);
-				validateGet(is, testBuffer);
-
-				// Check reaction to invalid keys
+			// Check reaction to invalid keys
+			try {
+				client.get(new BlobKey());
+				fail("Expected IOException did not occur");
+			}
+			catch (IOException fnfe) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
 				try {
-					client.get(new BlobKey());
-				} catch (FileNotFoundException fnfe) {
-					return;
-				}
-
-				fail("Expected FileNotFoundException did not occur");
-
-			} finally {
-				if (client != null) {
 					client.close();
-				}
+				} catch (Throwable t) {}
 			}
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
 		}
 	}
 
@@ -250,42 +242,45 @@ public class BlobClientTest {
 	@Test
 	public void testContentAddressableStream() {
 
-		try {
+		BlobClient client = null;
+		InputStream is = null;
 
-			final File testFile = File.createTempFile("testfile", ".dat");
+		try {
+			File testFile = File.createTempFile("testfile", ".dat");
 			testFile.deleteOnExit();
-			final BlobKey origKey = prepareTestFile(testFile);
 
-			BlobClient client = null;
-			InputStream is = null;
-			try {
-
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
-				client = new BlobClient(serverAddress);
+			BlobKey origKey = prepareTestFile(testFile);
 
-				// Store the data
-				is = new FileInputStream(testFile);
-				final BlobKey receivedKey = client.put(is);
-				assertEquals(origKey, receivedKey);
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
+			client = new BlobClient(serverAddress);
 
-				is.close();
-				is = null;
+			// Store the data
+			is = new FileInputStream(testFile);
+			BlobKey receivedKey = client.put(is);
+			assertEquals(origKey, receivedKey);
 
-				// Retrieve the data
-				is = client.get(receivedKey);
-				validateGet(is, testFile);
+			is.close();
+			is = null;
 
-			} finally {
-				if (is != null) {
+			// Retrieve the data
+			is = client.get(receivedKey);
+			validateGet(is, testFile);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (is != null) {
+				try {
 					is.close();
-				}
-				if (client != null) {
+				} catch (Throwable t) {}
+			}
+			if (client != null) {
+				try {
 					client.close();
-				}
+				} catch (Throwable t) {}
 			}
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
 		}
 	}
 
@@ -300,11 +295,9 @@ public class BlobClientTest {
 		final String key = "testkey";
 
 		try {
-
 			BlobClient client = null;
 			try {
-
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
 				client = new BlobClient(serverAddress);
 
 				// Store the data
@@ -320,20 +313,21 @@ public class BlobClientTest {
 				// Check if the BLOB is still available
 				try {
 					client.get(jobID, key);
-				} catch (FileNotFoundException fnfe) {
-					return;
+					fail("Expected IOException did not occur");
 				}
-
-				fail("Expected FileNotFoundException did not occur");
-
-			} finally {
+				catch (IOException e) {
+					// expected
+				}
+			}
+			finally {
 				if (client != null) {
 					client.close();
 				}
 			}
-
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
 	}
 
@@ -355,7 +349,7 @@ public class BlobClientTest {
 			InputStream is = null;
 			try {
 
-				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort());
 				client = new BlobClient(serverAddress);
 
 				// Store the data
@@ -369,7 +363,8 @@ public class BlobClientTest {
 				is = client.get(jobID, key);
 				validateGet(is, testFile);
 
-			} finally {
+			}
+			finally {
 				if (is != null) {
 					is.close();
 				}
@@ -378,8 +373,10 @@ public class BlobClientTest {
 				}
 			}
 
-		} catch (IOException ioe) {
-			fail(StringUtils.stringifyException(ioe));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
new file mode 100644
index 0000000..adb3bfc
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests how DELETE requests behave.
+ */
+public class BlobServerDeleteTest {
+
+	private final Random rnd = new Random();
+
+	@Test
+	public void testDeleteSingle() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = client.put(data);
+			assertNotNull(key);
+
+			// issue a DELETE request
+			client.delete(key);
+			client.close();
+
+			client = new BlobClient(serverAddress);
+			try {
+				client.get(key);
+				fail("BLOB should have been deleted");
+			}
+			catch (IOException e) {
+				// expected
+			}
+
+			try {
+				client.put(new byte[1]);
+				fail("client should be closed after erroneous operation");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testDeleteAll() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			JobID jobID = new JobID();
+			String name1 = "random name";
+			String name2 = "any nyme";
+
+			// put content addressable (like libraries)
+			client.put(jobID, name1, data);
+			client.put(jobID, name2, new byte[712]);
+
+
+			// issue a DELETE ALL request
+			client.deleteAll(jobID);
+			client.close();
+
+			client = new BlobClient(serverAddress);
+			try {
+				client.get(jobID, name1);
+				fail("BLOB should have been deleted");
+			}
+			catch (IOException e) {
+				// expected
+			}
+
+			try {
+				client.put(new byte[1]);
+				fail("client should be closed after erroneous operation");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+
+			client = new BlobClient(serverAddress);
+			try {
+				client.get(jobID, name2);
+				fail("BLOB should have been deleted");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testDeleteAlreadyDeletedByBlobKey() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = client.put(data);
+			assertNotNull(key);
+
+			File blobFile = server.getStorageLocation(key);
+			assertTrue(blobFile.delete());
+
+			// issue a DELETE request
+			try {
+				client.delete(key);
+			}
+			catch (IOException e) {
+				fail("DELETE operation should not fail if file is already deleted");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testDeleteAlreadyDeletedByName() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			JobID jid = new JobID();
+			String name = "------------fdghljEgRJHF+##4U789Q345";
+
+			client.put(jid, name, data);
+
+			File blobFile = server.getStorageLocation(jid, name);
+			assertTrue(blobFile.delete());
+
+			// issue a DELETE request
+			try {
+				client.delete(jid, name);
+			}
+			catch (IOException e) {
+				fail("DELETE operation should not fail if file is already deleted");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testDeleteFails() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = client.put(data);
+			assertNotNull(key);
+
+			File blobFile = server.getStorageLocation(key);
+			File directory = blobFile.getParentFile();
+
+			assertTrue(blobFile.setWritable(false, false));
+			assertTrue(directory.setWritable(false, false));
+
+			// issue a DELETE request
+			try {
+				client.delete(key);
+				fail("DELETE operation should fail if file cannot be deleted");
+			}
+			catch (IOException e) {
+				// expected
+			}
+			finally {
+				blobFile.setWritable(true, false);
+				directory.setWritable(true, false);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
new file mode 100644
index 0000000..c564670
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests how failing GET requests behave in the presence of failures.
+ * Successful GET requests are tested in conjunction wit the PUT
+ * requests.
+ */
+public class BlobServerGetTest {
+
+	private final Random rnd = new Random();
+
+	@Test
+	public void testGetFailsDuringLookup() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = client.put(data);
+			assertNotNull(key);
+
+			// delete all files to make sure that GET requests fail
+			File blobFile = server.getStorageLocation(key);
+			assertTrue(blobFile.delete());
+
+			// issue a GET request that fails
+			try {
+				client.get(key);
+				fail("This should not succeed.");
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testGetFailsDuringStreaming() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[5000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key = client.put(data);
+			assertNotNull(key);
+
+			// issue a GET request that succeeds
+			InputStream is = client.get(key);
+
+			byte[] receiveBuffer = new byte[50000];
+			BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null);
+			BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null);
+
+			// shut down the server
+			for (BlobServerConnection conn : server.getCurrentyActiveConnections()) {
+				conn.close();
+			}
+
+			try {
+				byte[] remainder = new byte[data.length - 2*receiveBuffer.length];
+				BlobUtils.readFully(is, remainder, 0, remainder.length, null);
+				fail();
+			}
+			catch (IOException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
new file mode 100644
index 0000000..1f8d29b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for successful and failing PUT operations against the BLOB server,
+ * and successful GET operations.
+ */
+public class BlobServerPutTest {
+
+	private final Random rnd = new Random();
+
+	@Test
+	public void testPutBufferSuccessful() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			BlobKey key1 = client.put(data);
+			assertNotNull(key1);
+
+			BlobKey key2 = client.put(data, 10, 44);
+			assertNotNull(key2);
+
+			// put under job and name scope
+			JobID jid = new JobID();
+			String stringKey = "my test key";
+			client.put(jid, stringKey, data);
+
+			// --- GET the data and check that it is equal ---
+
+			// one get request on the same client
+			InputStream is1 = client.get(key2);
+			byte[] result1 = new byte[44];
+			BlobUtils.readFully(is1, result1, 0, result1.length, null);
+			is1.close();
+
+			for (int i = 0, j = 10; i < result1.length; i++, j++) {
+				assertEquals(data[j], result1[i]);
+			}
+
+			// close the client and create a new one for the remaining requests
+			client.close();
+			client = new BlobClient(serverAddress);
+
+			InputStream is2 = client.get(key1);
+			byte[] result2 = new byte[data.length];
+			BlobUtils.readFully(is2, result2, 0, result2.length, null);
+			is2.close();
+			assertArrayEquals(data, result2);
+
+			InputStream is3 = client.get(jid, stringKey);
+			byte[] result3 = new byte[data.length];
+			BlobUtils.readFully(is3, result3, 0, result3.length, null);
+			is3.close();
+			assertArrayEquals(data, result3);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+
+	@Test
+	public void testPutStreamSuccessful() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			{
+				BlobKey key1 = client.put(new ByteArrayInputStream(data));
+				assertNotNull(key1);
+
+			}
+
+			// put under job and name scope
+			{
+				JobID jid = new JobID();
+				String stringKey = "my test key";
+				client.put(jid, stringKey, new ByteArrayInputStream(data));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testPutChunkedStreamSuccessful() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			{
+				BlobKey key1 = client.put(new ChunkedInputStream(data, 19));
+				assertNotNull(key1);
+
+			}
+
+			// put under job and name scope
+			{
+				JobID jid = new JobID();
+				String stringKey = "my test key";
+				client.put(jid, stringKey, new ChunkedInputStream(data, 17));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testPutBufferFails() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		File tempFileDir = null;
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			// make sure the blob server cannot create any files in its storage dir
+			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
+			assertTrue(tempFileDir.setExecutable(true, false));
+			assertTrue(tempFileDir.setReadable(true, false));
+			assertTrue(tempFileDir.setWritable(false, false));
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put content addressable (like libraries)
+			try {
+				client.put(data);
+				fail("This should fail.");
+			}
+			catch (IOException e) {
+				assertTrue(e.getMessage(), e.getMessage().contains("Server side error"));
+			}
+
+			try {
+				client.put(data);
+				fail("Client should be closed");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			// set writable again to make sure we can remove the directory
+			if (tempFileDir != null) {
+				tempFileDir.setWritable(true, false);
+			}
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	@Test
+	public void testPutNamedBufferFails() {
+		BlobServer server = null;
+		BlobClient client = null;
+
+		File tempFileDir = null;
+		try {
+			Configuration config = new Configuration();
+			server = new BlobServer(config);
+
+			// make sure the blob server cannot create any files in its storage dir
+			tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
+			assertTrue(tempFileDir.setExecutable(true, false));
+			assertTrue(tempFileDir.setReadable(true, false));
+			assertTrue(tempFileDir.setWritable(false, false));
+
+			InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
+			client = new BlobClient(serverAddress);
+
+			byte[] data = new byte[2000000];
+			rnd.nextBytes(data);
+
+			// put under job and name scope
+			try {
+				JobID jid = new JobID();
+				String stringKey = "my test key";
+				client.put(jid, stringKey, data);
+				fail("This should fail.");
+			}
+			catch (IOException e) {
+				assertTrue(e.getMessage(), e.getMessage().contains("Server side error"));
+			}
+
+			try {
+				JobID jid = new JobID();
+				String stringKey = "another key";
+				client.put(jid, stringKey, data);
+				fail("Client should be closed");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			// set writable again to make sure we can remove the directory
+			if (tempFileDir != null) {
+				tempFileDir.setWritable(true, false);
+			}
+			if (client != null) {
+				try {
+					client.close();
+				} catch (Throwable t) {
+					t.printStackTrace();
+				}
+			}
+			if (server != null) {
+				server.shutdown();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static final class ChunkedInputStream extends InputStream {
+
+		private final byte[][] data;
+
+		private int x = 0, y = 0;
+
+
+		private ChunkedInputStream(byte[] data, int numChunks) {
+			this.data = new byte[numChunks][];
+
+			int bytesPerChunk = data.length / numChunks;
+			int bytesTaken = 0;
+			for (int i = 0; i < numChunks - 1; i++, bytesTaken += bytesPerChunk) {
+				this.data[i] = new byte[bytesPerChunk];
+				System.arraycopy(data, bytesTaken, this.data[i], 0, bytesPerChunk);
+			}
+
+			this.data[numChunks -  1] = new byte[data.length - bytesTaken];
+			System.arraycopy(data, bytesTaken, this.data[numChunks -  1], 0, this.data[numChunks -  1].length);
+		}
+
+		@Override
+		public int read() {
+			if (x < data.length) {
+				byte[] curr = data[x];
+				if (y < curr.length) {
+					byte next = curr[y];
+					y++;
+					return next;
+				}
+				else {
+					y = 0;
+					x++;
+					return read();
+				}
+			} else {
+				return -1;
+			}
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			if (len == 0) {
+				return 0;
+			}
+			if (x < data.length) {
+				byte[] curr = data[x];
+				if (y < curr.length) {
+					int toCopy = Math.min(len, curr.length - y);
+					System.arraycopy(curr, y, b, off, toCopy);
+					y += toCopy;
+					return toCopy;
+				} else {
+					y = 0;
+					x++;
+					return read(b, off, len);
+				}
+			}
+			else {
+				return -1;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
new file mode 100644
index 0000000..93f9b73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+public class TestingFailingBlobServer extends BlobServer {
+
+	private int numFailures;
+
+	public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException {
+		super(config);
+		this.numFailures = numFailures;
+	}
+
+	@Override
+	public void run() {
+
+		// we do properly the first operation (PUT)
+		try {
+			new BlobServerConnection(getServerSocket().accept(), this).start();
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+		}
+
+		// do some failing operations
+		for (int num = 0; num < numFailures && !isShutdown(); num++) {
+			Socket socket = null;
+			try {
+				socket = getServerSocket().accept();
+				InputStream is = socket.getInputStream();
+				OutputStream os = socket.getOutputStream();
+
+				// just abort everything
+				is.close();
+				os.close();
+				socket.close();
+			}
+			catch (IOException e) {
+			}
+			finally {
+				if (socket != null) {
+					try {
+						socket.close();
+					} catch(Throwable t) {}
+				}
+			}
+		}
+
+		// regular runs
+		super.run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5029b15..1d2b6ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -52,7 +52,7 @@ public class BlobLibraryCacheManagerTest {
 
 		try {
 			server = new BlobServer(new Configuration());
-			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getServerPort());
+			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
 			BlobClient bc = new BlobClient(blobSocketAddress);
 
 			keys.add(bc.put(buf));


Mime
View raw message