flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1578] [BLOB manager] Improve failure handling and add more failure tests.
Date Wed, 18 Feb 2015 14:05:46 GMT
[FLINK-1578] [BLOB manager] Improve failure handling and add more failure tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfce493f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfce493f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfce493f

Branch: refs/heads/master
Commit: cfce493feb70a49d2722dc2a0d79f845f7e0461a
Parents: 47fed3d
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 18 12:16:35 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 18 13:59:00 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  55 +-
 .../org/apache/flink/runtime/AbstractID.java    |  93 ++-
 .../apache/flink/runtime/blob/BlobCache.java    | 164 +++--
 .../apache/flink/runtime/blob/BlobClient.java   | 700 ++++++++++++-------
 .../flink/runtime/blob/BlobConnection.java      | 337 ---------
 .../flink/runtime/blob/BlobInputStream.java     |  10 +-
 .../org/apache/flink/runtime/blob/BlobKey.java  |   3 +-
 .../apache/flink/runtime/blob/BlobServer.java   | 261 +++----
 .../runtime/blob/BlobServerConnection.java      | 466 ++++++++++++
 .../flink/runtime/blob/BlobServerProtocol.java  |  59 ++
 .../apache/flink/runtime/blob/BlobService.java  |  19 +-
 .../apache/flink/runtime/blob/BlobUtils.java    | 122 +++-
 .../apache/flink/runtime/AbstractIDTest.java    |  42 ++
 .../runtime/blob/BlobCacheRetriesTest.java      | 150 ++++
 .../runtime/blob/BlobCacheSuccessTest.java      | 121 ++++
 .../flink/runtime/blob/BlobCacheTest.java       | 121 ----
 .../flink/runtime/blob/BlobClientTest.java      | 169 +++--
 .../runtime/blob/BlobServerDeleteTest.java      | 323 +++++++++
 .../flink/runtime/blob/BlobServerGetTest.java   | 149 ++++
 .../flink/runtime/blob/BlobServerPutTest.java   | 402 +++++++++++
 .../runtime/blob/TestingFailingBlobServer.java  |  74 ++
 .../BlobLibraryCacheManagerTest.java            |   2 +-
 22 files changed, 2755 insertions(+), 1087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a0bf365..42a3c9a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -42,17 +42,6 @@ public final class ConfigConstants {
 	public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
 	
 	// -------------------------------- Runtime -------------------------------
-
-	/**
-	 * The config parameter defining the storage directory to be used by the blob server.
-	 */
-	public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";
-
-	/**
-	 * The config parameter defining the cleanup interval of the library cache manager.
-	 */
-	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager" +
-			".cleanup.interval";
 	
 	/**
 	 * The config parameter defining the network address to connect to
@@ -71,7 +60,32 @@ public final class ConfigConstants {
 	 * marked as failed.
 	 */
 	public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.msecs";
-	
+
+	/**
+	 * The config parameter defining the storage directory to be used by the blob server.
+	 */
+	public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";
+
+	/**
+	 * The config parameter defining number of retires for failed BLOB fetches.
+	 */
+	public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries";
+
+	/**
+	 * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
+	 */
+	public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent";
+
+	/**
+	 * The config parameter defining the backlog of BLOB fetches on the JobManager
+	 */
+	public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog";
+
+	/**
+	 * The config parameter defining the cleanup interval of the library cache manager.
+	 */
+	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
+
 	/**
 	 * The config parameter defining the task manager's IPC port from the configuration.
 	 */
@@ -405,7 +419,22 @@ public final class ConfigConstants {
 	 */
 	// 30 seconds (its enough to get to mars, should be enough to detect failure)
 	public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000;
-	
+
+	/**
+	 * Default number of retries for failed BLOB fetches.
+	 */
+	public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
+
+	/**
+	 * Default number of concurrent BLOB fetch operations.
+	 */
+	public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;
+
+	/**
+	 * Default BLOB fetch connection backlog.
+	 */
+	public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
+
 	/**
 	 * The default network port the task manager expects incoming IPC connections. The {@code 0} means that
 	 * the TaskManager searches for a free port.

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
index 130e3eb..247a052 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.flink.core.io.IOReadableWritable;
@@ -103,48 +101,39 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Gets the lower 64 bits of the ID.
+	 *
+	 * @return The lower 64 bits of the ID.
+	 */
 	public long getLowerPart() {
 		return lowerPart;
 	}
-	
-	public long getUpperPart() {
-		return upperPart;
-	}
-
-	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Converts the given byte array to a long.
+	 * Gets the upper 64 bits of the ID.
 	 *
-	 * @param ba the byte array to be converted
-	 * @param offset the offset indicating at which byte inside the array the conversion shall begin
-	 * @return the long variable
+	 * @return The upper 64 bits of the ID.
 	 */
-	private static long byteArrayToLong(byte[] ba, int offset) {
-		long l = 0;
-
-		for (int i = 0; i < SIZE_OF_LONG; ++i) {
-			l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
-		}
-
-		return l;
+	public long getUpperPart() {
+		return upperPart;
 	}
 
 	/**
-	 * Converts a long to a byte array.
+	 * Gets the bytes underlying this ID.
 	 *
-	 * @param l the long variable to be converted
-	 * @param ba the byte array to store the result the of the conversion
-	 * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
+	 * @return The bytes underlying this ID.
 	 */
-	private static void longToByteArray(final long l, final byte[] ba, final int offset) {
-		for (int i = 0; i < SIZE_OF_LONG; ++i) {
-			final int shift = i << 3; // i * 8
-			ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
-		}
+	public byte[] getBytes() {
+		byte[] bytes = new byte[SIZE];
+		longToByteArray(lowerPart, bytes, 0);
+		longToByteArray(upperPart, bytes, SIZE_OF_LONG);
+		return bytes;
 	}
-	
+
+	// --------------------------------------------------------------------------------------------
+	//  Serialization
 	// --------------------------------------------------------------------------------------------
 
 	@Override
@@ -159,17 +148,14 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 		out.writeLong(this.upperPart);
 	}
 
-	public void write(ByteBuffer buffer) {
-		buffer.putLong(this.lowerPart);
-		buffer.putLong(this.upperPart);
-	}
-
 	public void writeTo(ByteBuf buf) {
 		buf.writeLong(this.lowerPart);
 		buf.writeLong(this.upperPart);
 	}
 
 	// --------------------------------------------------------------------------------------------
+	//  Standard Utilities
+	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public boolean equals(Object obj) {
@@ -203,4 +189,39 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j
 		int diff2 = (this.lowerPart < o.lowerPart) ? -1 : ((this.lowerPart == o.lowerPart) ? 0 : 1);
 		return diff1 == 0 ? diff2 : diff1;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Conversion Utilities
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Converts the given byte array to a long.
+	 *
+	 * @param ba the byte array to be converted
+	 * @param offset the offset indicating at which byte inside the array the conversion shall begin
+	 * @return the long variable
+	 */
+	private static long byteArrayToLong(byte[] ba, int offset) {
+		long l = 0;
+
+		for (int i = 0; i < SIZE_OF_LONG; ++i) {
+			l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
+		}
+
+		return l;
+	}
+
+	/**
+	 * Converts a long to a byte array.
+	 *
+	 * @param l the long variable to be converted
+	 * @param ba the byte array to store the result the of the conversion
+	 * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
+	 */
+	private static void longToByteArray(long l, byte[] ba, int offset) {
+		for (int i = 0; i < SIZE_OF_LONG; ++i) {
+			final int shift = i << 3; // i * 8
+			ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 40ec4e3..0d1b29c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -40,9 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public final class BlobCache implements BlobService {
 
-	/**
-	 * The log object used for debugging.
-	 */
+	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
 
 	private final InetSocketAddress serverAddress;
@@ -54,6 +53,9 @@ public final class BlobCache implements BlobService {
 	/** Shutdown hook thread to ensure deletion of the storage directory. */
 	private final Thread shutdownHook;
 
+	/** The number of retries when the transfer fails */
+	private final int numFetchRetries;
+
 
 	public BlobCache(InetSocketAddress serverAddress, Configuration configuration) {
 		if (serverAddress == null || configuration == null) {
@@ -62,80 +64,122 @@ public final class BlobCache implements BlobService {
 
 		this.serverAddress = serverAddress;
 
+		// configure and create the storage directory
 		String storageDirectory = configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB cache storage directory " + storageDir);
 
+		// configure the number of fetch retries
+		final int fetchRetries = configuration.getInteger(
+				ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
+		if (fetchRetries >= 0) {
+			this.numFetchRetries = fetchRetries;
+		}
+		else {
+			LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.",
+					ConfigConstants.BLOB_FETCH_RETRIES_KEY);
+			this.numFetchRetries = 0;
+		}
+
 		// Add shutdown hook to delete storage directory
 		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 	}
 
 	/**
-	 * Returns the URL for the content-addressable BLOB with the given key. The method will first attempt to serve
-	 * the BLOB from its local cache. If one or more BLOB are not in the cache, the method will try to download them
-	 * from the BLOB server with the given address.
+	 * Returns the URL for the BLOB with the given key. The method will first attempt to serve
+	 * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it
+	 * from this cache's BLOB server.
 	 * 
-	 * @param requiredBlob
-	 *        the key of the desired content-addressable BLOB
-	 * @return URL referring to the local storage location of the BLOB
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while downloading the BLOBs from the BLOB server
+	 * @param requiredBlob The key of the desired BLOB.
+	 * @return URL referring to the local storage location of the BLOB.
+	 * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
 	 */
 	public URL getURL(final BlobKey requiredBlob) throws IOException {
 		if (requiredBlob == null) {
-			throw new IllegalArgumentException("Required BLOB cannot be null.");
+			throw new IllegalArgumentException("BLOB key cannot be null.");
 		}
 
-		BlobClient bc = null;
-		byte[] buf = null;
-		URL url = null;
+		final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
-		try {
-			final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
+		if (!localJarFile.exists()) {
 
-			if (!localJarFile.exists()) {
+			final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE];
 
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Trying to download " + requiredBlob + " from " + serverAddress);
-				}
+			// loop over retries
+			int attempt = 0;
+			while (true) {
 
-				bc = new BlobClient(serverAddress);
-				buf = new byte[BlobServer.BUFFER_SIZE];
+				if (attempt == 0) {
+					LOG.info("Downloading {} from {}", requiredBlob, serverAddress);
+				} else {
+					LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt);
+				}
 
-				InputStream is = null;
-				OutputStream os = null;
 				try {
-					is = bc.get(requiredBlob);
-					os = new FileOutputStream(localJarFile);
+					BlobClient bc = null;
+					InputStream is = null;
+					OutputStream os = null;
+
+					try {
+						bc = new BlobClient(serverAddress);
+						is = bc.get(requiredBlob);
+						os = new FileOutputStream(localJarFile);
+
+						while (true) {
+							final int read = is.read(buf);
+							if (read < 0) {
+								break;
+							}
+							os.write(buf, 0, read);
+						}
 
-					while (true) {
+						// we do explicitly not use a finally block, because we want the closing
+						// in the regular case to throw exceptions and cause the writing to fail.
+						// But, the closing on exception should not throw further exceptions and
+						// let us keep the root exception
+						os.close();
+						os = null;
+						is.close();
+						is = null;
+						bc.close();
+						bc = null;
 
-						final int read = is.read(buf);
-						if (read < 0) {
-							break;
+						// success, we finished
+						break;
+					}
+					catch (Throwable t) {
+						// we use "catch (Throwable)" to keep the root exception. Otherwise that exception
+						// it would be replaced by any exception thrown in the finally block
+						closeSilently(os);
+						closeSilently(is);
+						closeSilently(bc);
+
+						if (t instanceof IOException) {
+							throw (IOException) t;
+						} else {
+							throw new IOException(t.getMessage(), t);
 						}
-
-						os.write(buf, 0, read);
 					}
-				} finally {
-					if (is != null) {
-						is.close();
+				}
+				catch (IOException e) {
+					String message = "Failed to fetch BLOB " + requiredBlob + "  from " + serverAddress + '.';
+					if (attempt < numFetchRetries) {
+						attempt++;
+						if (LOG.isDebugEnabled()) {
+							LOG.debug(message + " Retrying...", e);
+						} else {
+							LOG.error(message + " Retrying...");
+						}
 					}
-					if (os != null) {
-						os.close();
+					else {
+						LOG.error(message + " No retries left.", e);
+						throw new IOException(message, e);
 					}
 				}
-			}
-			url = localJarFile.toURI().toURL();
-
-
-		} finally {
-			if (bc != null) {
-				bc.close();
-			}
+			} // end loop over retries
 		}
 
-		return url;
+		return localJarFile.toURI().toURL();
 	}
 
 	/**
@@ -145,8 +189,10 @@ public final class BlobCache implements BlobService {
 	public void delete(BlobKey key) throws IOException{
 		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
 
-		if(localFile.exists()) {
-			localFile.delete();
+		if (localFile.exists()) {
+			if (!localFile.delete()) {
+				LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath());
+			}
 		}
 	}
 
@@ -180,4 +226,24 @@ public final class BlobCache implements BlobService {
 			}
 		}
 	}
+
+	public File getStorageDir() {
+		return this.storageDir;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Miscellaneous
+	// ------------------------------------------------------------------------
+
+	private void closeSilently(Closeable closeable) {
+		if (closeable != null) {
+			try {
+				closeable.close();
+			} catch (Throwable t) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Error while closing resource after BLOB transfer.", t);
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9a0479f..cb799c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,30 +19,45 @@
 package org.apache.flink.runtime.blob;
 
 import java.io.Closeable;
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
-import org.apache.flink.runtime.AbstractID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
 import org.apache.flink.runtime.jobgraph.JobID;
 
+import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobUtils.readFully;
+import static org.apache.flink.runtime.blob.BlobUtils.readLength;
+import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+
 /**
- * The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET), or delete (DELETE)
- * BLOBs.
- * <p>
- * This class is not thread-safe.
+ * The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET),
+ * or delete (DELETE) BLOBs.
  */
 public final class BlobClient implements Closeable {
 
-	/**
-	 * The socket connection to the BLOB server.
-	 */
-	private Socket socket;
+	private static final Logger LOG = LoggerFactory.getLogger(BlobClient.class);
+
+	/** The socket connection to the BLOB server. */
+	private final Socket socket;
 
 	/**
 	 * Instantiates a new BLOB client.
@@ -52,71 +67,177 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if the connection to the BLOB server could not be established
 	 */
-	public BlobClient(final InetSocketAddress serverAddress) throws IOException {
-
+	public BlobClient(InetSocketAddress serverAddress) throws IOException {
 		this.socket = new Socket();
 		try {
 			this.socket.connect(serverAddress);
-		}catch(IOException e){
+		}
+		catch(IOException e) {
+			BlobUtils.closeSilently(socket, LOG);
 			throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
 		}
 	}
 
+	@Override
+	public void close() throws IOException {
+		this.socket.close();
+	}
+
+	public boolean isClosed() {
+		return this.socket.isClosed();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  GET
+	// --------------------------------------------------------------------------------------------
+
 	/**
-	 * Constructs and writes the header data for a PUT request to the given output stream.
+	 * Downloads the BLOB identified by the given job ID and key from the BLOB server. If no such BLOB exists on the
+	 * server, a {@link FileNotFoundException} is thrown.
+	 * 
+	 * @param jobID
+	 *        the job ID identifying the BLOB to download
+	 * @param key
+	 *        the key identifying the BLOB to download
+	 * @return an input stream to read the retrieved data from
+	 * @throws IOException
+	 *         thrown if an I/O error occurs during the download
+	 */
+	public InputStream get(JobID jobID, String key) throws IOException {
+		if (key.length() > MAX_KEY_LENGTH) {
+			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
+		}
+
+		if (this.socket.isClosed()) {
+			throw new IllegalStateException("BLOB Client is not connected. " +
+					"Client has been shut down or encountered an error before.");
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format("GET BLOB %s / \"%s\" from %s", jobID, key, socket.getLocalSocketAddress()));
+		}
+
+		try {
+			OutputStream os = this.socket.getOutputStream();
+			InputStream is = this.socket.getInputStream();
+
+			sendGetHeader(os, jobID, key, null);
+			receiveAndCheckResponse(is);
+
+			return new BlobInputStream(is, null);
+		}
+		catch (Throwable t) {
+			BlobUtils.closeSilently(socket, LOG);
+			throw new IOException("GET operation failed: " + t.getMessage(), t);
+		}
+	}
+
+	/**
+	 * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a
+	 * {@link FileNotFoundException} is thrown.
 	 * 
+	 * @param blobKey
+	 *        the BLOB key identifying the BLOB to download
+	 * @return an input stream to read the retrieved data from
+	 * @throws IOException
+	 *         thrown if an I/O error occurs during the download
+	 */
+	public InputStream get(BlobKey blobKey) throws IOException {
+		if (this.socket.isClosed()) {
+			throw new IllegalStateException("BLOB Client is not connected. " +
+					"Client has been shut down or encountered an error before.");
+		}
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(String.format("GET content addressable BLOB %s from %s", blobKey, socket.getLocalSocketAddress()));
+		}
+
+		try {
+			OutputStream os = this.socket.getOutputStream();
+			InputStream is = this.socket.getInputStream();
+
+			// Send GET header
+			sendGetHeader(os, null, null, blobKey);
+			receiveAndCheckResponse(is);
+
+			return new BlobInputStream(is, blobKey);
+		}
+		catch (Throwable t) {
+			BlobUtils.closeSilently(socket, LOG);
+			throw new IOException("GET operation failed: " + t.getMessage(), t);
+		}
+	}
+
+	/**
+	 * Constructs and writes the header data for a GET operation to the given output stream.
+	 *
 	 * @param outputStream
-	 *        the output stream to write the PUT header data to
+	 *        the output stream to write the header data to
 	 * @param jobID
-	 *        the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a
-	 *        content-addressable BLOB
+	 *        the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used
+	 *        to identify the BLOB on the server instead
 	 * @param key
-	 *        the key of the BLOB to upload or <code>null</code> to indicate the upload of a content-addressable BLOB
-	 * @param buf
-	 *        an auxiliary buffer used for data serialization
+	 *        the key identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used to
+	 *        identify the BLOB on the server instead
+	 * @param blobKey
+	 *        the BLOB key to identify the BLOB to download if either the job ID or the regular key are
+	 *        <code>null</code>
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while writing the header data to the output stream
 	 */
-	private void sendPutHeader(final OutputStream outputStream, final JobID jobID, final String key, final byte[] buf)
-			throws IOException {
+	private void sendGetHeader(OutputStream outputStream, JobID jobID, String key, BlobKey blobKey) throws IOException {
 
 		// Signal type of operation
-		outputStream.write(BlobServer.PUT_OPERATION);
+		outputStream.write(GET_OPERATION);
 
-		// Check if PUT should be done in content-addressable manner
+		// Check if GET should be done in content-addressable manner
 		if (jobID == null || key == null) {
-			outputStream.write(1);
-		} else {
-			outputStream.write(0);
-			// Send job ID
-			final ByteBuffer bb = ByteBuffer.wrap(buf);
-			jobID.write(bb);
-			outputStream.write(buf);
-
-			// Send the key
+			outputStream.write(CONTENT_ADDRESSABLE);
+			blobKey.writeToOutputStream(outputStream);
+		}
+		else {
+			outputStream.write(NAME_ADDRESSABLE);
+			// Send job ID and key
+			outputStream.write(jobID.getBytes());
 			byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
-			BlobServer.writeLength(keyBytes.length, buf, outputStream);
+			writeLength(keyBytes.length, outputStream);
 			outputStream.write(keyBytes);
 		}
 	}
 
+	private void receiveAndCheckResponse(InputStream is) throws IOException {
+		int response = is.read();
+		if (response < 0) {
+			throw new EOFException("Premature end of response");
+		}
+		if (response == RETURN_ERROR) {
+			Throwable cause = readExceptionFromStream(is);
+			throw new IOException("Server side error: " + cause.getMessage(), cause);
+		}
+		else if (response != RETURN_OKAY) {
+			throw new IOException("Unrecognized response");
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  PUT
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Uploads the data of the given byte array to the BLOB server in a content-addressable manner.
-	 * 
+	 *
 	 * @param value
 	 *        the buffer to upload
 	 * @return the computed BLOB key identifying the BLOB on the server
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	public BlobKey put(final byte[] value) throws IOException {
-
+	public BlobKey put(byte[] value) throws IOException {
 		return put(value, 0, value.length);
 	}
 
 	/**
 	 * Uploads data from the given byte array to the BLOB server in a content-addressable manner.
-	 * 
+	 *
 	 * @param value
 	 *        the buffer to upload data from
 	 * @param offset
@@ -127,14 +248,13 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	public BlobKey put(final byte[] value, final int offset, final int len) throws IOException {
-
+	public BlobKey put(byte[] value, int offset, int len) throws IOException {
 		return putBuffer(null, null, value, offset, len);
 	}
 
 	/**
 	 * Uploads the data of the given byte array to the BLOB server and stores it under the given job ID and key.
-	 * 
+	 *
 	 * @param jobId
 	 *        the job ID to identify the uploaded data
 	 * @param key
@@ -144,14 +264,13 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	public void put(final JobID jobId, final String key, final byte[] value) throws IOException {
-
+	public void put(JobID jobId, String key, byte[] value) throws IOException {
 		put(jobId, key, value, 0, value.length);
 	}
 
 	/**
 	 * Uploads data from the given byte array to the BLOB server and stores it under the given job ID and key.
-	 * 
+	 *
 	 * @param jobId
 	 *        the job ID to identify the uploaded data
 	 * @param key
@@ -165,11 +284,9 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	public void put(final JobID jobId, final String key, final byte[] value, final int offset, final int len)
-			throws IOException {
-
-		if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH);
+	public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
+		if (key.length() > MAX_KEY_LENGTH) {
+			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
 		}
 
 		putBuffer(jobId, key, value, offset, len);
@@ -177,7 +294,7 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Uploads data from the given input stream to the BLOB server and stores it under the given job ID and key.
-	 * 
+	 *
 	 * @param jobId
 	 *        the job ID to identify the uploaded data
 	 * @param key
@@ -188,10 +305,9 @@ public final class BlobClient implements Closeable {
 	 *         thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the
 	 *         BLOB server
 	 */
-	public void put(final JobID jobId, final String key, final InputStream inputStream) throws IOException {
-
-		if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH);
+	public void put(JobID jobId, String key, InputStream inputStream) throws IOException {
+		if (key.length() > MAX_KEY_LENGTH) {
+			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
 		}
 
 		putInputStream(jobId, key, inputStream);
@@ -199,7 +315,7 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Uploads the data from the given input stream to the BLOB server in a content-addressable manner.
-	 * 
+	 *
 	 * @param inputStream
 	 *        the input stream to read the data from
 	 * @return the computed BLOB key identifying the BLOB on the server
@@ -207,93 +323,13 @@ public final class BlobClient implements Closeable {
 	 *         thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the
 	 *         BLOB server
 	 */
-	public BlobKey put(final InputStream inputStream) throws IOException {
-
+	public BlobKey put(InputStream inputStream) throws IOException {
 		return putInputStream(null, null, inputStream);
 	}
 
 	/**
-	 * Deletes the BLOB identified by the given job ID and key from the BLOB server.
-	 * 
-	 * @param jobId
-	 *        the job ID to identify the BLOB
-	 * @param key
-	 *        the key to identify the BLOB
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
-	 */
-	public void delete(final JobID jobId, final String key) throws IOException {
-
-		if (jobId == null) {
-			throw new IllegalArgumentException("Argument jobID must not be null");
-		}
-
-		if (key == null) {
-			throw new IllegalArgumentException("Argument key must not be null");
-		}
-
-		if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH);
-		}
-
-		deleteInternal(jobId, key);
-	}
-
-	/**
-	 * Deletes all BLOBs belonging to the job with the given ID from the BLOB server
-	 * 
-	 * @param jobId
-	 *        the job ID to identify the BLOBs to be deleted
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
-	 */
-	public void deleteAll(final JobID jobId) throws IOException {
-
-		if (jobId == null) {
-			throw new IllegalArgumentException("Argument jobID must not be null");
-		}
-
-		deleteInternal(jobId, null);
-	}
-
-	/**
-	 * Delete one or multiple BLOBs from the BLOB server.
-	 * 
-	 * @param jobId
-	 *        the job ID to identify the BLOB(s) to be deleted
-	 * @param key
-	 *        the key to identify the specific BLOB to delete or <code>null</code> to delete all BLOBs associated with
-	 *        the job
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
-	 */
-	private void deleteInternal(final JobID jobId, final String key) throws IOException {
-
-		final OutputStream os = this.socket.getOutputStream();
-		final byte[] buf = new byte[AbstractID.SIZE];
-
-		// Signal type of operation
-		os.write(BlobServer.DELETE_OPERATION);
-
-		// Send job ID
-		final ByteBuffer bb = ByteBuffer.wrap(buf);
-		jobId.write(bb);
-		os.write(buf);
-
-		if (key == null) {
-			os.write(0);
-		} else {
-			os.write(1);
-			// Send the key
-			byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
-			BlobServer.writeLength(keyBytes.length, buf, os);
-			os.write(keyBytes);
-		}
-	}
-
-	/**
 	 * Uploads data from the given byte buffer to the BLOB server.
-	 * 
+	 *
 	 * @param jobId
 	 *        the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
 	 *        manner
@@ -311,56 +347,62 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	private BlobKey putBuffer(final JobID jobId, final String key, final byte[] value, final int offset, final int len)
-			throws IOException {
+	private BlobKey putBuffer(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
+		if (this.socket.isClosed()) {
+			throw new IllegalStateException("BLOB Client is not connected. " +
+					"Client has been shut down or encountered an error before.");
+		}
 
-		final OutputStream os = this.socket.getOutputStream();
-		final MessageDigest md = (jobId == null || key == null) ? BlobUtils.createMessageDigest() :
-				null;
-		final byte[] buf = new byte[AbstractID.SIZE];
+		if (LOG.isDebugEnabled()) {
+			if (jobId == null) {
+				LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s",
+						len, socket.getLocalSocketAddress()));
+			} else {
+				LOG.debug(String.format("PUT BLOB buffer (%d bytes) under %s / \"%s\" to %s",
+						len, jobId, key, socket.getLocalSocketAddress()));
+			}
+		}
 
-		// Send the PUT header
-		sendPutHeader(os, jobId, key, buf);
+		try {
+			final OutputStream os = this.socket.getOutputStream();
+			final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
 
-		// Send the value in iterations of BUFFER_SIZE
-		int remainingBytes = value.length;
-		int bytesSent = 0;
+			// Send the PUT header
+			sendPutHeader(os, jobId, key);
 
-		while (remainingBytes > 0) {
+			// Send the value in iterations of BUFFER_SIZE
+			int remainingBytes = len;
 
-			final int bytesToSend = Math.min(BlobServer.BUFFER_SIZE, remainingBytes);
-			BlobServer.writeLength(bytesToSend, buf, os);
+			while (remainingBytes > 0) {
+				final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
+				writeLength(bytesToSend, os);
 
-			os.write(value, offset + bytesSent, bytesToSend);
+				os.write(value, offset, bytesToSend);
 
-			// Update the message digest if necessary
-			if (md != null) {
-				md.update(value, offset + bytesSent, bytesToSend);
-			}
+				// Update the message digest if necessary
+				if (md != null) {
+					md.update(value, offset, bytesToSend);
+				}
 
-			remainingBytes -= bytesToSend;
-			bytesSent += bytesToSend;
-		}
+				remainingBytes -= bytesToSend;
+				offset += bytesToSend;
+			}
+			// send -1 as the stream end
+			writeLength(-1, os);
 
-		if (md == null) {
-			return null;
+			// Receive blob key and compare
+			final InputStream is = this.socket.getInputStream();
+			return receivePutResponseAndCompare(is, md);
 		}
-
-		// Receive blob key and compare
-		final InputStream is = this.socket.getInputStream();
-		final BlobKey localKey = new BlobKey(md.digest());
-		final BlobKey remoteKey = BlobKey.readFromInputStream(is);
-
-		if (!localKey.equals(remoteKey)) {
-			throw new IOException("Detected data corruption during transfer");
+		catch (Throwable t) {
+			BlobUtils.closeSilently(socket, LOG);
+			throw new IOException("PUT operation failed: " + t.getMessage(), t);
 		}
-
-		return localKey;
 	}
 
 	/**
 	 * Uploads data from the given input stream to the BLOB server.
-	 * 
+	 *
 	 * @param jobId
 	 *        the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable
 	 *        manner
@@ -374,143 +416,261 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	private BlobKey putInputStream(final JobID jobId, final String key, final InputStream inputStream)
-			throws IOException {
-
-		final OutputStream os = this.socket.getOutputStream();
-		final MessageDigest md = (jobId == null || key == null) ? BlobUtils.createMessageDigest
-				() : null;
-		final byte[] buf = new byte[AbstractID.SIZE];
-		final byte[] xferBuf = new byte[BlobServer.BUFFER_SIZE];
-
-		// Send the PUT header
-		sendPutHeader(os, jobId, key, buf);
-
-		while (true) {
+	private BlobKey putInputStream(JobID jobId, String key, InputStream inputStream) throws IOException {
+		if (this.socket.isClosed()) {
+			throw new IllegalStateException("BLOB Client is not connected. " +
+					"Client has been shut down or encountered an error before.");
+		}
 
-			final int read = inputStream.read(xferBuf);
-			if (read < 0) {
-				break;
+		if (LOG.isDebugEnabled()) {
+			if (jobId == null) {
+				LOG.debug(String.format("PUT content addressable BLOB stream to %s",
+						socket.getLocalSocketAddress()));
+			} else {
+				LOG.debug(String.format("PUT BLOB stream under %s / \"%s\" to %s",
+						jobId, key, socket.getLocalSocketAddress()));
 			}
-			if (read > 0) {
-				BlobServer.writeLength(read, buf, os);
-				os.write(xferBuf, 0, read);
-				if (md != null) {
-					md.update(xferBuf, 0, read);
+		}
+
+		try {
+			final OutputStream os = this.socket.getOutputStream();
+			final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
+			final byte[] xferBuf = new byte[BUFFER_SIZE];
+
+			// Send the PUT header
+			sendPutHeader(os, jobId, key);
+
+			while (true) {
+				final int read = inputStream.read(xferBuf);
+				if (read < 0) {
+					// we are done. send a -1 and be done
+					writeLength(-1, os);
+					break;
+				}
+				if (read > 0) {
+					writeLength(read, os);
+					os.write(xferBuf, 0, read);
+					if (md != null) {
+						md.update(xferBuf, 0, read);
+					}
 				}
 			}
+
+			// Receive blob key and compare
+			final InputStream is = this.socket.getInputStream();
+			return receivePutResponseAndCompare(is, md);
+		}
+		catch (Throwable t) {
+			BlobUtils.closeSilently(socket, LOG);
+			throw new IOException("PUT operation failed: " + t.getMessage(), t);
 		}
+	}
 
-		if (md == null) {
-			return null;
+	private BlobKey receivePutResponseAndCompare(InputStream is, MessageDigest md) throws IOException {
+		int response = is.read();
+		if (response < 0) {
+			throw new EOFException("Premature end of response");
 		}
+		else if (response == RETURN_OKAY) {
+			if (md == null) {
+				// not content addressable
+				return null;
+			}
 
-		// Receive blob key and compare
-		final InputStream is = this.socket.getInputStream();
-		final BlobKey localKey = new BlobKey(md.digest());
-		final BlobKey remoteKey = BlobKey.readFromInputStream(is);
+			BlobKey remoteKey = BlobKey.readFromInputStream(is);
+			BlobKey localKey = new BlobKey(md.digest());
 
-		if (!localKey.equals(remoteKey)) {
-			throw new IOException("Detected data corruption during transfer");
-		}
+			if (!localKey.equals(remoteKey)) {
+				throw new IOException("Detected data corruption during transfer");
+			}
 
-		return localKey;
+			return localKey;
+		}
+		else if (response == RETURN_ERROR) {
+			Throwable cause = readExceptionFromStream(is);
+			throw new IOException("Server side error: " + cause.getMessage(), cause);
+		}
+		else {
+			throw new IOException("Unrecognized response");
+		}
 	}
 
 	/**
-	 * Downloads the BLOB identified by the given job ID and key from the BLOB server. If no such BLOB exists on the
-	 * server, a {@link FileNotFoundException} is thrown.
-	 * 
+	 * Constructs and writes the header data for a PUT request to the given output stream.
+	 * NOTE: If the jobId and key are null, we send the data to the content addressable section.
+	 *
+	 * @param outputStream
+	 *        the output stream to write the PUT header data to
 	 * @param jobID
-	 *        the job ID identifying the BLOB to download
+	 *        the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a
+	 *        content-addressable BLOB
 	 * @param key
-	 *        the key identifying the BLOB to download
-	 * @return an input stream to read the retrieved data from
+	 *        the key of the BLOB to upload or <code>null</code> to indicate the upload of a content-addressable BLOB
 	 * @throws IOException
-	 *         thrown if an I/O error occurs during the download
+	 *         thrown if an I/O error occurs while writing the header data to the output stream
 	 */
-	public InputStream get(final JobID jobID, final String key) throws IOException {
+	private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) throws IOException {
+		// sanity check that either both are null or both are not null
+		if ((jobID != null || key != null) && !(jobID != null && key != null)) {
+			throw new IllegalArgumentException();
+		}
 
-		if (key.length() > BlobServer.MAX_KEY_LENGTH) {
-			throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH);
+		// Signal type of operation
+		outputStream.write(PUT_OPERATION);
+
+		// Check if PUT should be done in content-addressable manner
+		if (jobID == null) {
+			outputStream.write(CONTENT_ADDRESSABLE);
 		}
+		else {
+			outputStream.write(NAME_ADDRESSABLE);
+			// Send job ID and the key
+			byte[] idBytes = jobID.getBytes();
+			byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
+			outputStream.write(idBytes);
+			writeLength(keyBytes.length, outputStream);
+			outputStream.write(keyBytes);
+		}
+	}
 
-		final OutputStream os = this.socket.getOutputStream();
-		final byte[] buf = new byte[AbstractID.SIZE];
+	// --------------------------------------------------------------------------------------------
+	//  DELETE
+	// --------------------------------------------------------------------------------------------
 
-		// Send GET header
-		sendGetHeader(os, jobID, key, null, buf);
+	/**
+	 * Deletes the BLOB identified by the given BLOB key from the BLOB server.
+	 *
+	 * @param key
+	 *        the key to identify the BLOB
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
+	 */
+	public void delete(BlobKey key) throws IOException {
+		if (key == null) {
+			throw new IllegalArgumentException("BLOB key must not be null");
+		}
 
-		return new BlobInputStream(this.socket.getInputStream(), null, buf);
+		deleteInternal(null, null, key);
 	}
 
 	/**
-	 * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a
-	 * {@link FileNotFoundException} is thrown.
-	 * 
-	 * @param blobKey
-	 *        the BLOB key identifying the BLOB to download
-	 * @return an input stream to read the retrieved data from
+	 * Deletes the BLOB identified by the given job ID and key from the BLOB server.
+	 *
+	 * @param jobId
+	 *        the job ID to identify the BLOB
+	 * @param key
+	 *        the key to identify the BLOB
 	 * @throws IOException
-	 *         thrown if an I/O error occurs during the download
+	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
 	 */
-	public InputStream get(final BlobKey blobKey) throws IOException {
+	public void delete(JobID jobId, String key) throws IOException {
+		if (jobId == null) {
+			throw new IllegalArgumentException("JobID must not be null");
+		}
+		if (key == null) {
+			throw new IllegalArgumentException("Key must not be null");
+		}
+		if (key.length() > MAX_KEY_LENGTH) {
+			throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH);
+		}
 
-		final OutputStream os = this.socket.getOutputStream();
-		final byte[] buf = new byte[AbstractID.SIZE];
+		deleteInternal(jobId, key, null);
+	}
 
-		// Send GET header
-		sendGetHeader(os, null, null, blobKey, buf);
+	/**
+	 * Deletes all BLOBs belonging to the job with the given ID from the BLOB server
+	 *
+	 * @param jobId
+	 *        the job ID to identify the BLOBs to be deleted
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while transferring the request to the BLOB server
+	 */
+	public void deleteAll(JobID jobId) throws IOException {
+		if (jobId == null) {
+			throw new IllegalArgumentException("Argument jobID must not be null");
+		}
 
-		return new BlobInputStream(this.socket.getInputStream(), blobKey, buf);
+		deleteInternal(jobId, null, null);
 	}
 
 	/**
-	 * Constructs and writes the header data for a GET operation to the given output stream.
-	 * 
-	 * @param outputStream
-	 *        the output stream to write the header data to
-	 * @param jobID
-	 *        the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used
-	 *        to identify the BLOB on the server instead
-	 * @param key
-	 *        the key identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used to
-	 *        identify the BLOB on the server instead
-	 * @param key2
-	 *        the BLOB key to identify the BLOB to download if either the job ID or the regular key are
-	 *        <code>null</code>
-	 * @param buf
-	 *        auxiliary buffer used for data serialization
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing the header data to the output stream
+	 * Delete one or multiple BLOBs from the BLOB server.
+	 *
+	 * @param jobId The job ID to identify the BLOB(s) to be deleted.
+	 * @param key The key to identify the specific BLOB to delete or <code>null</code> to delete
+	 *            all BLOBs associated with the job id.
+	 * @param bKey The blob key to identify a specific content addressable BLOB. This parameter
+	 *             is exclusive with jobId and key.
+	 * @throws IOException Thrown if an I/O error occurs while transferring the request to the BLOB server.
 	 */
-	private void sendGetHeader(final OutputStream outputStream, final JobID jobID, final String key,
-			final BlobKey key2, final byte[] buf) throws IOException {
+	private void deleteInternal(JobID jobId, String key, BlobKey bKey) throws IOException {
+		if ((jobId != null && bKey != null) || (jobId == null && bKey == null)) {
+			throw new IllegalArgumentException();
+		}
 
-		// Signal type of operation
-		outputStream.write(BlobServer.GET_OPERATION);
+		try {
+			final OutputStream outputStream = this.socket.getOutputStream();
+			final InputStream inputStream = this.socket.getInputStream();
 
-		// Check if GET should be done in content-addressable manner
-		if (jobID == null || key == null) {
-			outputStream.write(1);
-			key2.writeToOutputStream(outputStream);
-		} else {
-			outputStream.write(0);
-			// Send job ID
-			final ByteBuffer bb = ByteBuffer.wrap(buf);
-			jobID.write(bb);
-			outputStream.write(buf);
-
-			// Send the key
-			byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
-			BlobServer.writeLength(keyBytes.length, buf, outputStream);
-			outputStream.write(keyBytes);
+			// Signal type of operation
+			outputStream.write(DELETE_OPERATION);
+
+			// Check if DELETE should be done in content-addressable manner
+			if (jobId == null) {
+				// delete blob key
+				outputStream.write(CONTENT_ADDRESSABLE);
+				bKey.writeToOutputStream(outputStream);
+			}
+			else if (key != null) {
+				// delete BLOB for jobID and name key
+				outputStream.write(NAME_ADDRESSABLE);
+				// Send job ID and the key
+				byte[] idBytes = jobId.getBytes();
+				byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
+				outputStream.write(idBytes);
+				writeLength(keyBytes.length, outputStream);
+				outputStream.write(keyBytes);
+			}
+			else {
+				// delete all blobs for JobID
+				outputStream.write(JOB_ID_SCOPE);
+				byte[] idBytes = jobId.getBytes();
+				outputStream.write(idBytes);
+			}
+
+			int response = inputStream.read();
+			if (response < 0) {
+				throw new EOFException("Premature end of response");
+			}
+			if (response == RETURN_ERROR) {
+				Throwable cause = readExceptionFromStream(inputStream);
+				throw new IOException("Server side error: " + cause.getMessage(), cause);
+			}
+			else if (response != RETURN_OKAY) {
+				throw new IOException("Unrecognized response");
+			}
+		}
+		catch (Throwable t) {
+			BlobUtils.closeSilently(socket, LOG);
+			throw new IOException("DELETE operation failed: " + t.getMessage(), t);
 		}
 	}
 
-	@Override
-	public void close() throws IOException {
+	// --------------------------------------------------------------------------------------------
+	//  Miscellaneous
+	// --------------------------------------------------------------------------------------------
 
-		this.socket.close();
+	private static Throwable readExceptionFromStream(InputStream in) throws IOException {
+		int len = readLength(in);
+		byte[] bytes = new byte[len];
+		readFully(in, bytes, 0, len, "Error message");
+
+		try {
+			return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+		}
+		catch (ClassNotFoundException e) {
+			// should never occur
+			throw new IOException("Could not transfer error message", e);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java
deleted file mode 100644
index 3b7a31b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A BLOB connection handles a series of requests from a particular BLOB client.
- * <p>
- * This class it thread-safe.
- */
-class BlobConnection extends Thread {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(BlobConnection.class);
-
-	/**
-	 * The socket to communicate with the client.
-	 */
-	private final Socket clientSocket;
-
-	/**
-	 * The BLOB server.
-	 */
-	private final BlobServer blobServer;
-
-	/**
-	 * Creates a new BLOB connection for a client request
-	 * 
-	 * @param clientSocket
-	 *        the socket to read/write data
-	 * @param blobServer
-	 *        the BLOB server
-	 */
-	BlobConnection(final Socket clientSocket, final BlobServer blobServer) {
-		super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString());
-
-		this.clientSocket = clientSocket;
-		this.blobServer = blobServer;
-	}
-
-	@Override
-	public void run() {
-
-		try {
-
-			final InputStream inputStream = this.clientSocket.getInputStream();
-			final OutputStream outputStream = this.clientSocket.getOutputStream();
-			final byte[] buffer = new byte[BlobServer.BUFFER_SIZE];
-
-			while (true) {
-
-				// Read the requested operation
-				final int operation = inputStream.read();
-				if (operation < 0) {
-					return;
-				}
-
-				switch (operation) {
-				case BlobServer.PUT_OPERATION:
-					put(inputStream, outputStream, buffer);
-					break;
-				case BlobServer.GET_OPERATION:
-					get(inputStream, outputStream, buffer);
-					break;
-				case BlobServer.DELETE_OPERATION:
-					delete(inputStream, buffer);
-					break;
-				default:
-					throw new IOException("Unknown operation " + operation);
-				}
-			}
-
-		} catch (IOException ioe) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Error while executing BLOB connection.", ioe);
-			}
-		} finally {
-			closeSilently(this.clientSocket);
-		}
-	}
-
-	/**
-	 * Handles an incoming GET request from a BLOB client.
-	 * 
-	 * @param inputStream
-	 *        the input stream to read incoming data from
-	 * @param outputStream
-	 *        the output stream to send data back to the client
-	 * @param buf
-	 *        an auxiliary buffer for data serialization/deserialization
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading/writing data from/to the respective streams
-	 */
-	private void get(final InputStream inputStream, final OutputStream outputStream, final byte[] buf)
-			throws IOException {
-
-		File blob = null;
-
-		final int contentAdressable = inputStream.read();
-		if (contentAdressable < 0) {
-			throw new EOFException("Expected GET header");
-		}
-
-		if (contentAdressable == 0) {
-			// Receive the job ID
-			BlobServer.readFully(inputStream, buf, 0, JobID.SIZE);
-			final ByteBuffer bb = ByteBuffer.wrap(buf);
-			final JobID jobID = JobID.fromByteBuffer(bb);
-			// Receive the key
-			final String key = readKey(buf, inputStream);
-			blob = this.blobServer.getStorageLocation(jobID, key);
-		} else {
-			final BlobKey key = BlobKey.readFromInputStream(inputStream);
-			blob = blobServer.getStorageLocation(key);
-		}
-
-		// Check if BLOB exists
-		if (!blob.exists()) {
-			BlobServer.writeLength(-1, buf, outputStream);
-			return;
-		}
-
-		BlobServer.writeLength((int) blob.length(), buf, outputStream);
-		FileInputStream fis = null;
-		try {
-			fis = new FileInputStream(blob);
-
-			while (true) {
-
-				final int read = fis.read(buf);
-				if (read < 0) {
-					break;
-				}
-				outputStream.write(buf, 0, read);
-			}
-
-		} finally {
-			if (fis != null) {
-				fis.close();
-			}
-		}
-	}
-
-	/**
-	 * Handles an incoming PUT request from a BLOB client.
-	 * 
-	 * @param inputStream
-	 *        the input stream to read incoming data from
-	 * @param outputStream
-	 *        the output stream to send data back to the client
-	 * @param buf
-	 *        an auxiliary buffer for data serialization/deserialization
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading/writing data from/to the respective streams
-	 */
-	private void put(final InputStream inputStream, final OutputStream outputStream, final byte[] buf)
-			throws IOException {
-
-		JobID jobID = null;
-		String key = null;
-		MessageDigest md = null;
-		final int contentAdressable = inputStream.read();
-		if (contentAdressable < 0) {
-			throw new EOFException("Expected PUT header");
-		}
-
-		if (contentAdressable == 0) {
-			// Receive the job ID
-			BlobServer.readFully(inputStream, buf, 0, JobID.SIZE);
-			final ByteBuffer bb = ByteBuffer.wrap(buf);
-			jobID = JobID.fromByteBuffer(bb);
-			// Receive the key
-			key = readKey(buf, inputStream);
-		} else {
-			md = BlobUtils.createMessageDigest();
-		}
-
-		File incomingFile = null;
-		FileOutputStream fos = null;
-
-		try {
-			incomingFile = blobServer.getTemporaryFilename();
-			fos = new FileOutputStream(incomingFile);
-
-			while (true) {
-
-				final int bytesExpected = BlobServer.readLength(buf, inputStream);
-				if (bytesExpected > BlobServer.BUFFER_SIZE) {
-					throw new IOException("Unexpected number of incoming bytes: " + bytesExpected);
-				}
-
-				BlobServer.readFully(inputStream, buf, 0, bytesExpected);
-				fos.write(buf, 0, bytesExpected);
-
-				if (md != null) {
-					md.update(buf, 0, bytesExpected);
-				}
-
-				if (bytesExpected < BlobServer.BUFFER_SIZE) {
-					break;
-				}
-			}
-
-			fos.close();
-			fos = null;
-
-			if (contentAdressable == 0) {
-				final File storageFile = this.blobServer.getStorageLocation(jobID, key);
-				incomingFile.renameTo(storageFile);
-				incomingFile = null;
-			} else {
-				final BlobKey blobKey = new BlobKey(md.digest());
-				final File storageFile = blobServer.getStorageLocation(blobKey);
-				incomingFile.renameTo(storageFile);
-				incomingFile = null;
-
-				// Return computed key to client for validation
-				blobKey.writeToOutputStream(outputStream);
-			}
-		} finally {
-			if (fos != null) {
-				fos.close();
-			}
-			if (incomingFile != null) {
-				incomingFile.delete();
-			}
-		}
-	}
-
-	/**
-	 * Handles an incoming DELETE request from a BLOB client.
-	 * 
-	 * @param inputStream
-	 *        the input stream to read the request from.
-	 * @param buf
-	 *        an auxiliary buffer for data deserialization
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the request data from the input stream
-	 */
-	private void delete(final InputStream inputStream, final byte[] buf) throws IOException {
-
-		// Receive the job ID
-		BlobServer.readFully(inputStream, buf, 0, JobID.SIZE);
-		final ByteBuffer bb = ByteBuffer.wrap(buf);
-		final JobID jobID = JobID.fromByteBuffer(bb);
-		String key = null;
-
-		final int r = inputStream.read();
-		if (r < 0) {
-			throw new EOFException();
-		}
-		if (r > 0) {
-			// Delete individual BLOB
-			// Receive the key
-			key = readKey(buf, inputStream);
-
-			final File blob = this.blobServer.getStorageLocation(jobID, key);
-			blob.delete();
-
-		} else {
-			// Delete all BLOBs for this job
-			blobServer.deleteJobDirectory(jobID);
-		}
-	}
-
-	/**
-	 * Auxiliary method to silently close a {@link Socket}.
-	 * 
-	 * @param socket
-	 *        the socket to close
-	 */
-	static void closeSilently(final Socket socket) {
-
-		try {
-			if (socket != null) {
-				socket.close();
-			}
-		} catch (IOException ioe) {
-		}
-	}
-
-	/**
-	 * Reads the key of a BLOB from the given input stream.
-	 * 
-	 * @param buf
-	 *        auxiliary buffer to data deserialization
-	 * @param inputStream
-	 *        the input stream to read the key from
-	 * @return the key of a BLOB
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading the key data from the input stream
-	 */
-	private static String readKey(final byte[] buf,
-			final InputStream inputStream) throws IOException {
-
-		final int keyLength = BlobServer.readLength(buf, inputStream);
-		if (keyLength > BlobServer.MAX_KEY_LENGTH) {
-			throw new IOException("Unexpected key length " + keyLength);
-		}
-
-		BlobServer.readFully(inputStream, buf, 0, keyLength);
-
-		return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index 3654f8f..a89a461 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.security.MessageDigest;
 
+import static org.apache.flink.runtime.blob.BlobUtils.readLength;
+
 /**
  * The BLOB input stream is a special implementation of an {@link InputStream} to read the results of a GET operation
  * from the BLOB server.
@@ -63,15 +65,13 @@ final class BlobInputStream extends InputStream {
 	 *        the underlying input stream to read from
 	 * @param blobKey
 	 *        the expected BLOB key for content-addressable BLOBs, <code>null</code> for non-content-addressable BLOBs.
-	 * @param buf
-	 *        auxiliary buffer to read the meta data from the BLOB server
 	 * @throws IOException
 	 *         throws if an I/O error occurs while reading the BLOB data from the BLOB server
 	 */
-	BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey, final byte[] buf) throws IOException {
+	BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey) throws IOException {
 		this.wrappedInputStream = wrappedInputStream;
 		this.blobKey = blobKey;
-		this.bytesToReceive = BlobServer.readLength(buf, wrappedInputStream);
+		this.bytesToReceive = readLength(wrappedInputStream);
 		if (this.bytesToReceive < 0) {
 			throw new FileNotFoundException();
 		}
@@ -157,7 +157,7 @@ final class BlobInputStream extends InputStream {
 
 	@Override
 	public int available() throws IOException {
-		return 0;
+		return this.bytesToReceive - this.bytesReceived;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index e3d237d..bd254dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.blob;
 
 import java.io.EOFException;
@@ -142,7 +141,7 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 		while (bytesRead < BlobKey.SIZE) {
 			final int read = inputStream.read(key, bytesRead, BlobKey.SIZE - bytesRead);
 			if (read < 0) {
-				throw new EOFException();
+				throw new EOFException("Read an incomplete BLOB key");
 			}
 			bytesRead += read;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b27af03..c0e81f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.blob;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -42,61 +43,32 @@ import org.slf4j.LoggerFactory;
  * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
  * the BLOBs or temporarily cache them.
  */
-public final class BlobServer extends Thread implements BlobService {
+public class BlobServer extends Thread implements BlobService {
 
-	/**
-	 * The log object used for debugging.
-	 */
+	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
 
-	/**
-	 * The buffer size in bytes for network transfers.
-	 */
-	static final int BUFFER_SIZE = 4096;
-
-	/**
-	 * The maximum key length allowed for storing BLOBs.
-	 */
-	static final int MAX_KEY_LENGTH = 64;
-
-	/**
-	 * Internal code to identify a PUT operation.
-	 */
-	static final byte PUT_OPERATION = 0;
-
-	/**
-	 * Internal code to identify a GET operation.
-	 */
-	static final byte GET_OPERATION = 1;
-
-	/**
-	 * Internal code to identify a DELETE operation.
-	 */
-	static final byte DELETE_OPERATION = 2;
-
-	/**
-	 * Counter to generate unique names for temporary files.
-	 */
+	/** Counter to generate unique names for temporary files. */
 	private final AtomicInteger tempFileCounter = new AtomicInteger(0);
 
-	/**
-	 * The server socket listening for incoming connections.
-	 */
+	/** The server socket listening for incoming connections. */
 	private final ServerSocket serverSocket;
 
-	/**
-	 * Indicates whether a shutdown of server component has been requested.
-	 */
-	private AtomicBoolean shutdownRequested = new AtomicBoolean();
+	/** Indicates whether a shutdown of server component has been requested. */
+	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
 	/** Shutdown hook thread to ensure deletion of the storage directory. */
 	private final Thread shutdownHook;
 
-	/**
-	 * Is the root directory for file storage
-	 */
+	/** Is the root directory for file storage */
 	private final File storageDir;
 
+	/** Set of currently running threads */
+	private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
+
+	/** The maximum number of concurrent connections */
+	private final int maxConnections;
+
 	/**
 	 * Instantiates a new BLOB server and binds it to a free network port.
 	 * 
@@ -105,38 +77,57 @@ public final class BlobServer extends Thread implements BlobService {
 	 */
 	public BlobServer(Configuration config) throws IOException {
 
+		// configure and create the storage directory
 		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
+		// configure the maximum number of concurrent connections
+		final int maxConnections = config.getInteger(
+				ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+		if (maxConnections >= 1) {
+			this.maxConnections = maxConnections;
+		}
+		else {
+			LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}",
+					maxConnections, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+			this.maxConnections = ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT;
+		}
+
+		// configure the backlog of connections
+		int backlog = config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+		if (backlog < 1) {
+			LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}",
+					backlog, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+			backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
+		}
+
 		// Add shutdown hook to delete storage directory
 		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
+		// start the server
 		try {
-			this.serverSocket = new ServerSocket(0);
-
-			start();
-
-			if (LOG.isInfoEnabled()) {
-				LOG.info(String.format("Started BLOB server on port %d",
-						this.serverSocket.getLocalPort()));
-			}
+			this.serverSocket = new ServerSocket(0, backlog);
 		}
 		catch (IOException e) {
-			throw new IOException("Could not create BlobServer with random port.", e);
+			throw new IOException("Could not create BlobServer with automatic port choice.", e);
 		}
-	}
 
-	/**
-	 * Returns the network port the BLOB server is bound to. The return value of this method is undefined after the BLOB
-	 * server has been shut down.
-	 * 
-	 * @return the network port the BLOB server is bound to
-	 */
-	public int getServerPort() {
-		return this.serverSocket.getLocalPort();
+		// start the server thread
+		setName("BLOB Server listener at " + getPort());
+		setDaemon(true);
+		start();
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}",
+					serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog);
+		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Path Accessors
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Returns a file handle to the file associated with the given blob key on the blob
 	 * server.
@@ -174,7 +165,7 @@ public final class BlobServer extends Thread implements BlobService {
 	 * 
 	 * @return a temporary file inside the BLOB server's incoming directory
 	 */
-	File getTemporaryFilename() {
+	File createTemporaryFilename() {
 		return new File(BlobUtils.getIncomingDirectory(storageDir),
 				String.format("temp-%08d", tempFileCounter.getAndIncrement()));
 	}
@@ -183,7 +174,26 @@ public final class BlobServer extends Thread implements BlobService {
 	public void run() {
 		try {
 			while (!this.shutdownRequested.get()) {
-				new BlobConnection(this.serverSocket.accept(), this).start();
+				BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);
+				try {
+					synchronized (activeConnections) {
+						while (activeConnections.size() >= maxConnections) {
+							activeConnections.wait(2000);
+						}
+						activeConnections.add(conn);
+					}
+
+					conn.start();
+					conn = null;
+				}
+				finally {
+					if (conn != null) {
+						conn.close();
+						synchronized (activeConnections) {
+							activeConnections.remove(conn);
+						}
+					}
+				}
 			}
 		}
 		catch (Throwable t) {
@@ -206,6 +216,10 @@ public final class BlobServer extends Thread implements BlobService {
 			catch (IOException ioe) {
 				LOG.debug("Error while closing the server socket.", ioe);
 			}
+
+			// wake the thread up, in case it is waiting on some operation
+			interrupt();
+
 			try {
 				join();
 			}
@@ -213,6 +227,16 @@ public final class BlobServer extends Thread implements BlobService {
 				LOG.debug("Error while waiting for this thread to die.", ie);
 			}
 
+			synchronized (activeConnections) {
+				if (!activeConnections.isEmpty()) {
+					for (BlobServerConnection conn : activeConnections) {
+						LOG.debug("Shutting down connection " + conn.getName());
+						conn.close();
+					}
+					activeConnections.clear();
+				}
+			}
+
 			// Clean up the storage directory
 			try {
 				FileUtils.deleteDirectory(storageDir);
@@ -255,8 +279,7 @@ public final class BlobServer extends Thread implements BlobService {
 		final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);
 
 		if (!localFile.exists()) {
-			throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does " +
-					"not exist.");
+			throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does not exist.");
 		} else {
 			return localFile.toURI().toURL();
 		}
@@ -266,15 +289,17 @@ public final class BlobServer extends Thread implements BlobService {
 	 * This method deletes the file associated to the blob key if it exists in the local storage
 	 * of the blob server.
 	 *
-	 * @param blobKey associated with the file to be deleted
+	 * @param key associated with the file to be deleted
 	 * @throws IOException
 	 */
 	@Override
-	public void delete(BlobKey blobKey) throws IOException {
-		final File localFile = BlobUtils.getStorageLocation(storageDir, blobKey);
+	public void delete(BlobKey key) throws IOException {
+		final File localFile = BlobUtils.getStorageLocation(storageDir, key);
 
 		if (localFile.exists()) {
-			localFile.delete();
+			if (!localFile.delete()) {
+				LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
+			}
 		}
 	}
 
@@ -285,95 +310,35 @@ public final class BlobServer extends Thread implements BlobService {
 	 */
 	@Override
 	public int getPort() {
-		return getServerPort();
+		return this.serverSocket.getLocalPort();
 	}
 
 	/**
-	 * Auxiliary method to write the length of an upcoming data chunk to an
-	 * output stream.
+	 * Tests whether the BLOB server has been requested to shut down.
 	 *
-	 * @param length
-	 *        the length of the upcoming data chunk in bytes
-	 * @param buf
-	 *        the byte buffer to use for the integer serialization
-	 * @param outputStream
-	 *        the output stream to write the length to
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing to the output
-	 *         stream
+	 * @return True, if the server has been requested to shut down, false otherwise.
 	 */
-	static void writeLength(final int length, final byte[] buf,
-							final OutputStream outputStream) throws IOException {
-
-		buf[0] = (byte) (length & 0xff);
-		buf[1] = (byte) ((length >> 8) & 0xff);
-		buf[2] = (byte) ((length >> 16) & 0xff);
-		buf[3] = (byte) ((length >> 24) & 0xff);
-
-		outputStream.write(buf, 0, 4);
+	public boolean isShutdown() {
+		return this.shutdownRequested.get();
 	}
 
 	/**
-	 * Auxiliary method to read the length of an upcoming data chunk from an
-	 * input stream.
-	 *
-	 * @param buf
-	 *        the byte buffer to use for the integer deserialization
-	 * @param inputStream
-	 *        the input stream to read the length from
-	 * @return the length of the upcoming data chunk in bytes
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while reading from the input
-	 *         stream
+	 * Access to the server socket, for testing
 	 */
-	static int readLength(final byte[] buf, final InputStream inputStream)
-			throws IOException {
-
-		int bytesRead = 0;
-		while (bytesRead < 4) {
-			final int read = inputStream.read(buf, bytesRead, 4 - bytesRead);
-			if (read < 0) {
-				throw new EOFException();
-			}
-			bytesRead += read;
-		}
-
-		bytesRead = buf[0] & 0xff;
-		bytesRead |= (buf[1] & 0xff) << 8;
-		bytesRead |= (buf[2] & 0xff) << 16;
-		bytesRead |= (buf[3] & 0xff) << 24;
-
-		return bytesRead;
+	ServerSocket getServerSocket() {
+		return this.serverSocket;
 	}
 
-	/**
-	 * Auxiliary method to read a particular number of bytes from an input stream. This method blocks until the
-	 * requested number of bytes have been read from the stream. If the stream cannot offer enough data, an
-	 * {@link EOFException} is thrown.
-	 *
-	 * @param inputStream
-	 *        the input stream to read the data from
-	 * @param buf
-	 *        the buffer to store the read data
-	 * @param off
-	 *        the offset inside the buffer
-	 * @param len
-	 *        the number of bytes to read from the stream
-	 * @throws IOException
-	 *         thrown if I/O error occurs while reading from the stream or the stream cannot offer enough data
-	 */
-	static void readFully(final InputStream inputStream,
-						final byte[] buf, final int off, final int len) throws IOException {
-
-		int bytesRead = 0;
-		while (bytesRead < len) {
+	void unregisterConnection(BlobServerConnection conn) {
+		synchronized (activeConnections) {
+			activeConnections.remove(conn);
+			activeConnections.notifyAll();
+		}
+	}
 
-			final int read = inputStream.read(buf, off + bytesRead, len
-					- bytesRead);
-			if (read < 0) {
-				throw new EOFException();
-			}
-			bytesRead += read;
+	List<BlobServerConnection> getCurrentyActiveConnections() {
+		synchronized (activeConnections) {
+			return new ArrayList<BlobServerConnection>(activeConnections);
 		}
 	}
 }


Mime
View raw message