flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [14/14] flink git commit: [FLINK-7068][blob] Introduce permanent and transient BLOB keys
Date Thu, 05 Oct 2017 14:07:07 GMT
[FLINK-7068][blob] Introduce permanent and transient BLOB keys

[FLINK-7068][blob] address PR review comments, part 1

[FLINK-7068][blob] create a common base class for the BLOB caches

[FLINK-7068][blob] update some comments

[FLINK-7068][blob] integrate the BLOB type into the BlobKey

[FLINK-7068][blob] rename a few methods for better consistency

[FLINK-7068][blob] fix Blob*DeleteTest not working as documented in one test

[FLINK-7068][blob] add checks for jobId being null in PermanentBlobCache

[FLINK-7068][blob] implement get-and-delete logic for transient BLOBs

Transient BLOB files are deleted on the BlobServer upon first access from a
cache. Therefore, we do not need the DELETE operations anymore, aside from
deleting the file from the local cache (for now).

[FLINK-7068][blob] address PR comments, part 2

[FLINK-7068][blob] separate permanent and transient BLOB keys

* create PermanentBlobKey and TransientBlobKey (inheriting from BlobKey) and
  forbid using transient BLOBs with permanent caches and vice versa
* make BlobKey package-private, similarly for the BlobType which is now
  reflected by the two BlobKey sub-classes
-> this gives a cleaner interface for the user

This closes #4358.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84a07a34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84a07a34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84a07a34

Branch: refs/heads/master
Commit: 84a07a34ac22af14f2dd0319447ca5f45de6d0bb
Parents: b57330d
Author: Nico Kruber <nico@data-artisans.com>
Authored: Wed Sep 20 12:05:25 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Oct 5 16:06:27 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    |  44 +--
 .../flink/runtime/blob/AbstractBlobCache.java   | 241 +++++++++++++
 .../apache/flink/runtime/blob/BlobCache.java    |  80 -----
 .../flink/runtime/blob/BlobCacheService.java    |  96 +++++
 .../apache/flink/runtime/blob/BlobClient.java   | 190 +++-------
 .../flink/runtime/blob/BlobInputStream.java     |  31 +-
 .../org/apache/flink/runtime/blob/BlobKey.java  | 153 ++++++--
 .../apache/flink/runtime/blob/BlobServer.java   |  96 +++--
 .../runtime/blob/BlobServerConnection.java      | 214 +++++-------
 .../flink/runtime/blob/BlobServerProtocol.java  |  35 +-
 .../apache/flink/runtime/blob/BlobService.java  |   4 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   2 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |   5 +-
 .../flink/runtime/blob/PermanentBlobCache.java  | 289 ++++-----------
 .../flink/runtime/blob/PermanentBlobKey.java    |  45 +++
 .../runtime/blob/PermanentBlobService.java      |   8 +-
 .../flink/runtime/blob/TransientBlobCache.java  | 203 ++---------
 .../flink/runtime/blob/TransientBlobKey.java    |  45 +++
 .../runtime/blob/TransientBlobService.java      |  41 ++-
 .../apache/flink/runtime/client/JobClient.java  |  14 +-
 .../librarycache/BlobLibraryCacheManager.java   |  18 +-
 .../FallbackLibraryCacheManager.java            |   6 +-
 .../librarycache/LibraryCacheManager.java       |   6 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  10 +-
 .../runtime/executiongraph/JobInformation.java  |   8 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  13 +-
 .../slots/ActorTaskManagerGateway.java          |  12 +-
 .../jobmanager/slots/TaskManagerGateway.java    |   6 +-
 .../jobmaster/RpcTaskManagerGateway.java        |   6 +-
 .../jobmaster/message/ClassloadingProps.java    |   8 +-
 .../handler/legacy/TaskManagerLogHandler.java   |  14 +-
 .../taskexecutor/JobManagerConnection.java      |  12 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  16 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  16 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/messages/JobManagerMessages.scala   |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  10 +-
 .../runtime/blob/BlobCacheCleanupTest.java      | 104 +++---
 .../runtime/blob/BlobCacheCorruptionTest.java   |  67 ++--
 .../flink/runtime/blob/BlobCacheDeleteTest.java | 176 ++++------
 .../flink/runtime/blob/BlobCacheGetTest.java    | 306 +++++++++++-----
 .../flink/runtime/blob/BlobCachePutTest.java    | 350 ++++++++++++-------
 .../runtime/blob/BlobCacheRecoveryTest.java     |  36 +-
 .../runtime/blob/BlobCacheRetriesTest.java      |  35 +-
 .../runtime/blob/BlobCacheSuccessTest.java      |  36 +-
 .../flink/runtime/blob/BlobClientTest.java      | 117 ++++---
 .../apache/flink/runtime/blob/BlobKeyTest.java  | 128 ++++++-
 .../runtime/blob/BlobServerCorruptionTest.java  |   7 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  81 +++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 101 +++---
 .../flink/runtime/blob/BlobServerPutTest.java   | 248 +++++++------
 .../runtime/blob/BlobServerRecoveryTest.java    |  27 +-
 .../flink/runtime/blob/BlobUtilsTest.java       |  12 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   6 +-
 .../TaskDeploymentDescriptorTest.java           |   6 +-
 .../BlobLibraryCacheManagerTest.java            |  40 +--
 .../BlobLibraryCacheRecoveryITCase.java         |  17 +-
 .../executiongraph/FailoverRegionTest.java      |  18 +-
 .../executiongraph/GlobalModVersionTest.java    |   6 +-
 .../IndividualRestartsConcurrencyTest.java      |   6 +-
 .../PipelinedRegionFailoverConcurrencyTest.java |   6 +-
 .../RestartPipelinedRegionStrategyTest.java     |  18 +-
 .../utils/SimpleAckingTaskManagerGateway.java   |   6 +-
 .../jobmanager/JobManagerCleanupITCase.java     |   8 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   4 +-
 .../legacy/TaskManagerLogHandlerTest.java       |   4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  32 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  18 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  54 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |  12 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  66 ++--
 .../runtime/util/JvmExitOnFatalErrorTest.java   |  16 +-
 .../partitioner/RescalePartitionerTest.java     |   6 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |  18 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  18 +-
 .../tasks/StreamTaskTerminationTest.java        |  18 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  18 +-
 77 files changed, 2298 insertions(+), 1957 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 92f8413..e4b907a 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -255,21 +254,16 @@ public class HDFSTest {
 		org.apache.flink.configuration.Configuration
 			config = new org.apache.flink.configuration.Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
-		BlobStoreService blobStoreService = null;
+		BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
 		try {
-			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
 			BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
 		} finally {
-			if (blobStoreService != null) {
-				blobStoreService.closeAndCleanupAllData();
-			}
+			blobStoreService.closeAndCleanupAllData();
 		}
 	}
 
@@ -282,75 +276,61 @@ public class HDFSTest {
 		org.apache.flink.configuration.Configuration
 			config = new org.apache.flink.configuration.Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
-		BlobStoreService blobStoreService = null;
+		BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
 		try {
-			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
 			BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception);
 		} finally {
-			if (blobStoreService != null) {
-				blobStoreService.closeAndCleanupAllData();
-			}
+			blobStoreService.closeAndCleanupAllData();
 		}
 	}
 
 	/**
 	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
-	 * participating BlobServer when uploaded via a {@link org.apache.flink.runtime.blob.BlobCache}.
+	 * participating BlobServer when uploaded via a BLOB cache.
 	 */
 	@Test
 	public void testBlobCacheRecovery() throws Exception {
 		org.apache.flink.configuration.Configuration
 			config = new org.apache.flink.configuration.Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
-		BlobStoreService blobStoreService = null;
+		BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
 		try {
-			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
 			BlobCacheRecoveryTest.testBlobCacheRecovery(config, blobStoreService);
 		} finally {
-			if (blobStoreService != null) {
-				blobStoreService.closeAndCleanupAllData();
-			}
+			blobStoreService.closeAndCleanupAllData();
 		}
 	}
 
 	/**
 	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
-	 * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobCache}.
+	 * recognised during the download via a BLOB cache.
 	 */
 	@Test
 	public void testBlobCacheCorruptedFile() throws Exception {
 		org.apache.flink.configuration.Configuration
 			config = new org.apache.flink.configuration.Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			temporaryFolder.newFolder().getAbsolutePath());
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
-		BlobStoreService blobStoreService = null;
+		BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
 
 		try {
-			blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
-
-			BlobCacheCorruptionTest.testGetFailsFromCorruptFile(new JobID(), true, true, config, blobStoreService, exception);
+			BlobCacheCorruptionTest
+				.testGetFailsFromCorruptFile(new JobID(), config, blobStoreService, exception);
 		} finally {
-			if (blobStoreService != null) {
-				blobStoreService.closeAndCleanupAllData();
-			}
+			blobStoreService.closeAndCleanupAllData();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
new file mode 100644
index 0000000..dc031e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for permanent and transient BLOB files.
+ */
+public abstract class AbstractBlobCache implements Closeable {
+
+	/**
+	 * The log object used for debugging.
+	 */
+	protected final Logger log;
+
+	/**
+	 * Counter to generate unique names for temporary files.
+	 */
+	protected final AtomicLong tempFileCounter = new AtomicLong(0);
+
+	protected final InetSocketAddress serverAddress;
+
+	/**
+	 * Root directory for local file storage.
+	 */
+	protected final File storageDir;
+
+	/**
+	 * Blob store for distributed file storage, e.g. in HA.
+	 */
+	protected final BlobView blobView;
+
+	protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+	/**
+	 * Shutdown hook thread to ensure deletion of the local storage directory.
+	 */
+	protected final Thread shutdownHook;
+
+	/**
+	 * The number of retries when the transfer fails.
+	 */
+	protected final int numFetchRetries;
+
+	/**
+	 * Configuration for the blob client like ssl parameters required to connect to the blob
+	 * server.
+	 */
+	protected final Configuration blobClientConfig;
+
+	/**
+	 * Lock guarding concurrent file accesses.
+	 */
+	protected final ReadWriteLock readWriteLock;
+
+	public AbstractBlobCache(
+			final InetSocketAddress serverAddress,
+			final Configuration blobClientConfig,
+			final BlobView blobView,
+			final Logger logger) throws IOException {
+
+		this.log = checkNotNull(logger);
+
+		this.serverAddress = checkNotNull(serverAddress);
+		this.blobClientConfig = checkNotNull(blobClientConfig);
+		this.blobView = checkNotNull(blobView);
+		this.readWriteLock = new ReentrantReadWriteLock();
+
+		// configure and create the storage directory
+		String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+		this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
+		log.info("Created BLOB cache storage directory " + storageDir);
+
+		// configure the number of fetch retries
+		final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
+		if (fetchRetries >= 0) {
+			this.numFetchRetries = fetchRetries;
+		} else {
+			log.warn("Invalid value for {}. System will attempt no retries on failed fetch operations of BLOBs.",
+				BlobServerOptions.FETCH_RETRIES.key());
+			this.numFetchRetries = 0;
+		}
+
+		// Add shutdown hook to delete storage directory
+		shutdownHook = BlobUtils.addShutdownHook(this, log);
+	}
+
+	/**
+	 * Returns local copy of the file for the BLOB with the given key.
+	 *
+	 * <p>The method will first attempt to serve the BLOB from its local cache. If the BLOB is not
+	 * in the cache, the method will try to download it from this cache's BLOB server via a
+	 * distributed BLOB store (if available) or direct end-to-end download.
+	 *
+	 * @param jobId
+	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+	 * @param blobKey
+	 * 		The key of the desired BLOB.
+	 *
+	 * @return file referring to the local storage location of the BLOB.
+	 *
+	 * @throws IOException
+	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
+	 */
+	protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
+		checkArgument(blobKey != null, "BLOB key cannot be null.");
+
+		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
+		readWriteLock.readLock().lock();
+
+		try {
+			if (localFile.exists()) {
+				return localFile;
+			}
+		} finally {
+			readWriteLock.readLock().unlock();
+		}
+
+		// first try the distributed blob store (if available)
+		// use a temporary file (thread-safe without locking)
+		File incomingFile = createTemporaryFilename();
+		try {
+			try {
+				if (blobView.get(jobId, blobKey, incomingFile)) {
+					// now move the temp file to our local cache atomically
+					BlobUtils.moveTempFileToStore(
+						incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+
+					return localFile;
+				}
+			} catch (Exception e) {
+				log.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
+			}
+
+			// fallback: download from the BlobServer
+			BlobClient.downloadFromBlobServer(
+				jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);
+
+			BlobUtils.moveTempFileToStore(
+				incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), log, null);
+
+			return localFile;
+		} finally {
+			// delete incomingFile from a failed download
+			if (!incomingFile.delete() && incomingFile.exists()) {
+				log.warn("Could not delete the staging file {} for blob key {} and job {}.",
+					incomingFile, blobKey, jobId);
+			}
+		}
+	}
+
+	/**
+	 * Returns the port the BLOB server is listening on.
+	 *
+	 * @return BLOB server port
+	 */
+	public int getPort() {
+		return serverAddress.getPort();
+	}
+
+	/**
+	 * Returns a temporary file inside the BLOB server's incoming directory.
+	 *
+	 * @return a temporary file inside the BLOB server's incoming directory
+	 *
+	 * @throws IOException
+	 * 		if creating the directory fails
+	 */
+	File createTemporaryFilename() throws IOException {
+		return new File(BlobUtils.getIncomingDirectory(storageDir),
+			String.format("temp-%08d", tempFileCounter.getAndIncrement()));
+	}
+
+	@Override
+	public void close() throws IOException {
+		cancelCleanupTask();
+
+		if (shutdownRequested.compareAndSet(false, true)) {
+			log.info("Shutting down BLOB cache");
+
+			// Clean up the storage directory
+			try {
+				FileUtils.deleteDirectory(storageDir);
+			} finally {
+				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
+				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+					try {
+						Runtime.getRuntime().removeShutdownHook(shutdownHook);
+					} catch (IllegalStateException e) {
+						// race, JVM is in shutdown already, we can safely ignore this
+					} catch (Throwable t) {
+						log.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Cancels any cleanup task that subclasses may be executing.
+	 *
+	 * <p>This is called during {@link #close()}.
+	 */
+	protected abstract void cancelCleanupTask();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
deleted file mode 100644
index 2a20015..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import org.apache.flink.configuration.Configuration;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
- */
-public class BlobCache implements BlobService {
-
-	/** Caching store for permanent BLOBs. */
-	private final PermanentBlobCache permanentBlobStore;
-
-	/** Store for transient BLOB files. */
-	private final TransientBlobCache transientBlobStore;
-
-	/**
-	 * Instantiates a new BLOB cache.
-	 *
-	 * @param serverAddress
-	 * 		address of the {@link BlobServer} to use for fetching files from
-	 * @param blobClientConfig
-	 * 		global configuration
-	 * @param blobView
-	 * 		(distributed) blob store file system to retrieve files from first
-	 *
-	 * @throws IOException
-	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
-	 */
-	public BlobCache(
-			final InetSocketAddress serverAddress,
-			final Configuration blobClientConfig,
-			final BlobView blobView) throws IOException {
-
-		this.permanentBlobStore = new PermanentBlobCache(serverAddress, blobClientConfig, blobView);
-		this.transientBlobStore = new TransientBlobCache(serverAddress, blobClientConfig);
-	}
-
-	@Override
-	public PermanentBlobCache getPermanentBlobStore() {
-		return permanentBlobStore;
-	}
-
-	@Override
-	public TransientBlobCache getTransientBlobStore() {
-		return transientBlobStore;
-	}
-
-	@Override
-	public void close() throws IOException {
-		permanentBlobStore.close();
-		transientBlobStore.close();
-	}
-
-	@Override
-	public int getPort() {
-		// NOTE: both blob stores connect to the same server!
-		return permanentBlobStore.getPort();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java
new file mode 100644
index 0000000..89ce2c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The BLOB cache provides access to BLOB services for permanent and transient BLOBs.
+ */
+public class BlobCacheService implements BlobService {
+
+	/** Caching store for permanent BLOBs. */
+	private final PermanentBlobCache permanentBlobCache;
+
+	/** Store for transient BLOB files. */
+	private final TransientBlobCache transientBlobCache;
+
+	/**
+	 * Instantiates a new BLOB cache.
+	 *
+	 * @param serverAddress
+	 * 		address of the {@link BlobServer} to use for fetching files from
+	 * @param blobClientConfig
+	 * 		global configuration
+	 * @param blobView
+	 * 		(distributed) blob store file system to retrieve files from first
+	 *
+	 * @throws IOException
+	 * 		thrown if the (local or distributed) file storage cannot be created or is not usable
+	 */
+	public BlobCacheService(
+			final InetSocketAddress serverAddress,
+			final Configuration blobClientConfig,
+			final BlobView blobView) throws IOException {
+
+		this(new PermanentBlobCache(serverAddress, blobClientConfig, blobView),
+			new TransientBlobCache(serverAddress, blobClientConfig));
+	}
+
+	/**
+	 * Instantiates a new BLOB cache.
+	 *
+	 * @param permanentBlobCache
+	 * 		BLOB cache to use for permanent BLOBs
+	 * @param transientBlobCache
+	 * 		BLOB cache to use for transient BLOBs
+	 */
+	public BlobCacheService(
+			PermanentBlobCache permanentBlobCache, TransientBlobCache transientBlobCache) {
+		this.permanentBlobCache = checkNotNull(permanentBlobCache);
+		this.transientBlobCache = checkNotNull(transientBlobCache);
+	}
+
+	@Override
+	public PermanentBlobCache getPermanentBlobService() {
+		return permanentBlobCache;
+	}
+
+	@Override
+	public TransientBlobCache getTransientBlobService() {
+		return transientBlobCache;
+	}
+
+	@Override
+	public void close() throws IOException {
+		permanentBlobCache.close();
+		transientBlobCache.close();
+	}
+
+	@Override
+	public int getPort() {
+		// NOTE: both blob stores connect to the same server!
+		return permanentBlobCache.getPort();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index a8ae471..3154f69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLSocket;
+
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
@@ -49,14 +51,13 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB_HA;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
@@ -76,7 +77,7 @@ public final class BlobClient implements Closeable {
 
 	/**
 	 * Instantiates a new BLOB client.
-	 * 
+	 *
 	 * @param serverAddress
 	 *        the network address of the BLOB server
 	 * @param clientConfig
@@ -126,12 +127,13 @@ public final class BlobClient implements Closeable {
 	/**
 	 * Downloads the given BLOB from the given server and stores its contents to a (local) file.
 	 *
+	 * <p>Transient BLOB files are deleted after a successful copy of the server's data into the
+	 * given <tt>localJarFile</tt>.
+	 *
 	 * @param jobId
 	 * 		job ID the BLOB belongs to or <tt>null</tt> if job-unrelated
 	 * @param blobKey
 	 * 		BLOB key
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
 	 * @param localJarFile
 	 * 		the local file to write to
 	 * @param serverAddress
@@ -145,9 +147,12 @@ public final class BlobClient implements Closeable {
 	 * 		if an I/O error occurs during the download
 	 */
 	static void downloadFromBlobServer(
-			@Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob, File localJarFile,
-			InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries)
-			throws IOException {
+			@Nullable JobID jobId,
+			BlobKey blobKey,
+			File localJarFile,
+			InetSocketAddress serverAddress,
+			Configuration blobClientConfig,
+			int numFetchRetries) throws IOException {
 
 		final byte[] buf = new byte[BUFFER_SIZE];
 		LOG.info("Downloading {}/{} from {}", jobId, blobKey, serverAddress);
@@ -157,7 +162,7 @@ public final class BlobClient implements Closeable {
 		while (true) {
 			try (
 				final BlobClient bc = new BlobClient(serverAddress, blobClientConfig);
-				final InputStream is = bc.getInternal(jobId, blobKey, permanentBlob);
+				final InputStream is = bc.getInternal(jobId, blobKey);
 				final OutputStream os = new FileOutputStream(localJarFile)
 			) {
 				while (true) {
@@ -168,7 +173,6 @@ public final class BlobClient implements Closeable {
 					os.write(buf, 0, read);
 				}
 
-				// success, we finished
 				return;
 			}
 			catch (Throwable t) {
@@ -176,7 +180,7 @@ public final class BlobClient implements Closeable {
 					" and store it under " + localJarFile.getAbsolutePath();
 				if (attempt < numFetchRetries) {
 					if (LOG.isDebugEnabled()) {
-						LOG.debug(message + " Retrying...", t);
+						LOG.error(message + " Retrying...", t);
 					} else {
 						LOG.error(message + " Retrying...");
 					}
@@ -213,8 +217,6 @@ public final class BlobClient implements Closeable {
 	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey
 	 * 		blob key associated with the requested file
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
 	 *
 	 * @return an input stream to read the retrieved data from
 	 *
@@ -223,7 +225,7 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 * 		if an I/O error occurs during the download
 	 */
-	InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
+	InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey)
 			throws IOException {
 
 		if (this.socket.isClosed()) {
@@ -240,10 +242,10 @@ public final class BlobClient implements Closeable {
 			InputStream is = this.socket.getInputStream();
 
 			// Send GET header
-			sendGetHeader(os, jobId, blobKey, permanentBlob);
+			sendGetHeader(os, jobId, blobKey);
 			receiveAndCheckGetResponse(is);
 
-			return new BlobInputStream(is, blobKey);
+			return new BlobInputStream(is, blobKey, os);
 		}
 		catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
@@ -260,36 +262,32 @@ public final class BlobClient implements Closeable {
 	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey
 	 * 		blob key associated with the requested file
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
 	 *
 	 * @throws IOException
 	 *         thrown if an I/O error occurs while writing the header data to the output stream
 	 */
 	private static void sendGetHeader(
-			OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey, boolean permanentBlob)
+			OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey)
 			throws IOException {
 		checkNotNull(blobKey);
-		checkArgument(jobId != null || !permanentBlob, "permanent BLOBs must be job-related");
+		checkArgument(jobId != null || blobKey instanceof TransientBlobKey,
+			"permanent BLOBs must be job-related");
 
 		// Signal type of operation
 		outputStream.write(GET_OPERATION);
 
 		// Send job ID and key
 		if (jobId == null) {
-			outputStream.write(CONTENT_NO_JOB);
-		} else if (permanentBlob) {
-			outputStream.write(CONTENT_FOR_JOB_HA);
-			outputStream.write(jobId.getBytes());
+			outputStream.write(JOB_UNRELATED_CONTENT);
 		} else {
-			outputStream.write(CONTENT_FOR_JOB);
+			outputStream.write(JOB_RELATED_CONTENT);
 			outputStream.write(jobId.getBytes());
 		}
 		blobKey.writeToOutputStream(outputStream);
 	}
 
 	/**
-	 * Reads the response from the input stream and throws in case of errors
+	 * Reads the response from the input stream and throws in case of errors.
 	 *
 	 * @param is
 	 * 		stream to read from
@@ -327,8 +325,8 @@ public final class BlobClient implements Closeable {
 	 * 		the read offset within the buffer
 	 * @param len
 	 * 		the number of bytes to read from the buffer
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 *
 	 * @return the computed BLOB key of the uploaded BLOB
 	 *
@@ -336,7 +334,7 @@ public final class BlobClient implements Closeable {
 	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
 	BlobKey putBuffer(
-			@Nullable JobID jobId, byte[] value, int offset, int len, boolean permanentBlob)
+			@Nullable JobID jobId, byte[] value, int offset, int len, BlobKey.BlobType blobType)
 			throws IOException {
 
 		if (this.socket.isClosed()) {
@@ -354,7 +352,7 @@ public final class BlobClient implements Closeable {
 			final MessageDigest md = BlobUtils.createMessageDigest();
 
 			// Send the PUT header
-			sendPutHeader(os, jobId, permanentBlob);
+			sendPutHeader(os, jobId, blobType);
 
 			// Send the value in iterations of BUFFER_SIZE
 			int remainingBytes = len;
@@ -378,7 +376,7 @@ public final class BlobClient implements Closeable {
 
 			// Receive blob key and compare
 			final InputStream is = this.socket.getInputStream();
-			return receiveAndCheckPutResponse(is, md);
+			return receiveAndCheckPutResponse(is, md, blobType);
 		}
 		catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
@@ -393,15 +391,15 @@ public final class BlobClient implements Closeable {
 	 * 		the ID of the job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param inputStream
 	 * 		the input stream to read the data from
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 *
 	 * @return the computed BLOB key of the uploaded BLOB
 	 *
 	 * @throws IOException
 	 * 		thrown if an I/O error occurs while uploading the data to the BLOB server
 	 */
-	BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, boolean permanentBlob)
+	BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
 			throws IOException {
 
 		if (this.socket.isClosed()) {
@@ -420,7 +418,7 @@ public final class BlobClient implements Closeable {
 			final byte[] xferBuf = new byte[BUFFER_SIZE];
 
 			// Send the PUT header
-			sendPutHeader(os, jobId, permanentBlob);
+			sendPutHeader(os, jobId, blobType);
 
 			while (true) {
 				// since we don't know a total size here, send lengths iteratively
@@ -439,7 +437,7 @@ public final class BlobClient implements Closeable {
 
 			// Receive blob key and compare
 			final InputStream is = this.socket.getInputStream();
-			return receiveAndCheckPutResponse(is, md);
+			return receiveAndCheckPutResponse(is, md, blobType);
 		}
 		catch (Throwable t) {
 			BlobUtils.closeSilently(socket, LOG);
@@ -454,41 +452,42 @@ public final class BlobClient implements Closeable {
 	 * 		the output stream to write the PUT header data to
 	 * @param jobId
 	 * 		the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
-	 * @param permanentBlob
-	 * 		whether the BLOB is permanent (<tt>true</tt>) or transient (<tt>false</tt>)
+	 * @param blobType
+	 * 		whether the BLOB should become permanent or transient
 	 *
 	 * @throws IOException
 	 * 		thrown if an I/O error occurs while writing the header data to the output stream
 	 */
 	private static void sendPutHeader(
-			OutputStream outputStream, @Nullable JobID jobId, boolean permanentBlob)
+			OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType)
 			throws IOException {
 		// Signal type of operation
 		outputStream.write(PUT_OPERATION);
 		if (jobId == null) {
-			outputStream.write(CONTENT_NO_JOB);
-		} else if (permanentBlob) {
-			outputStream.write(CONTENT_FOR_JOB_HA);
-			outputStream.write(jobId.getBytes());
+			outputStream.write(JOB_UNRELATED_CONTENT);
 		} else {
-			outputStream.write(CONTENT_FOR_JOB);
+			outputStream.write(JOB_RELATED_CONTENT);
 			outputStream.write(jobId.getBytes());
 		}
+		outputStream.write(blobType.ordinal());
 	}
 
 	/**
-	 * Reads the response from the input stream and throws in case of errors
+	 * Reads the response from the input stream and throws in case of errors.
 	 *
 	 * @param is
 	 * 		stream to read from
 	 * @param md
 	 * 		message digest to check the response against
+	 * @param blobType
+	 * 		whether the BLOB should be permanent or transient
 	 *
 	 * @throws IOException
 	 * 		if the response is an error, the message digest does not match or reading the response
 	 * 		failed
 	 */
-	private static BlobKey receiveAndCheckPutResponse(InputStream is, MessageDigest md)
+	private static BlobKey receiveAndCheckPutResponse(
+			InputStream is, MessageDigest md, BlobKey.BlobType blobType)
 			throws IOException {
 		int response = is.read();
 		if (response < 0) {
@@ -497,7 +496,7 @@ public final class BlobClient implements Closeable {
 		else if (response == RETURN_OKAY) {
 
 			BlobKey remoteKey = BlobKey.readFromInputStream(is);
-			BlobKey localKey = new BlobKey(md.digest());
+			BlobKey localKey = BlobKey.createKey(blobType, md.digest());
 
 			if (!localKey.equals(remoteKey)) {
 				throw new IOException("Detected data corruption during transfer");
@@ -514,86 +513,6 @@ public final class BlobClient implements Closeable {
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  DELETE
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Deletes the (transient) BLOB identified by the given BLOB key and job ID from the BLOB
-	 * server.
-	 *
-	 * @param jobId
-	 * 		the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
-	 * @param key
-	 * 		the key to identify the BLOB
-	 *
-	 * @return <tt>true</tt> if the delete operation was successful at the {@link BlobServer};
-	 * <tt>false</tt> otherwise
-	 *
-	 * @throws IOException
-	 * 		thrown if an I/O error occurs while transferring the request to the BLOB server or if the
-	 * 		BLOB server throws an exception while processing the request
-	 */
-	boolean deleteInternal(@Nullable JobID jobId, BlobKey key)
-			throws IOException {
-
-		checkNotNull(key);
-
-		try {
-			final OutputStream outputStream = this.socket.getOutputStream();
-			final InputStream inputStream = this.socket.getInputStream();
-
-			// Signal type of operation
-			outputStream.write(DELETE_OPERATION);
-
-			// delete blob key
-			if (jobId == null) {
-				outputStream.write(CONTENT_NO_JOB);
-			} else {
-				outputStream.write(CONTENT_FOR_JOB);
-				outputStream.write(jobId.getBytes());
-			}
-			key.writeToOutputStream(outputStream);
-
-			return receiveAndCheckDeleteResponse(inputStream);
-		}
-		catch (Throwable t) {
-			BlobUtils.closeSilently(socket, LOG);
-			throw new IOException("DELETE operation failed: " + t.getMessage(), t);
-		}
-	}
-
-	/**
-	 * Reads the response from the input stream and throws in case of errors
-	 *
-	 * @param is
-	 * 		stream to read from
-	 *
-	 * @return  <tt>true</tt> if the delete operation was successful at the {@link BlobServer};
-	 *          <tt>false</tt> otherwise
-	 *
-	 * @throws IOException
-	 * 		if the server code throws an exception or if reading the response failed
-	 */
-	private static boolean receiveAndCheckDeleteResponse(InputStream is) throws IOException {
-		int response = is.read();
-		if (response < 0) {
-			throw new EOFException("Premature end of response");
-		}
-		if (response == RETURN_ERROR) {
-			Throwable cause = readExceptionFromStream(is);
-			if (cause == null) {
-				return false;
-			} else {
-				throw new IOException("Server side error: " + cause.getMessage(), cause);
-			}
-		}
-		else if (response != RETURN_OKAY) {
-			throw new IOException("Unrecognized response");
-		}
-		return true;
-	}
-
 	/**
 	 * Uploads the JAR files to the {@link PermanentBlobService} of the {@link BlobServer} at the
 	 * given address with HA as configured.
@@ -610,12 +529,16 @@ public final class BlobClient implements Closeable {
 	 * @throws IOException
 	 * 		if the upload fails
 	 */
-	public static List<BlobKey> uploadJarFiles(InetSocketAddress serverAddress,
-			Configuration clientConfig, JobID jobId, List<Path> jars) throws IOException {checkNotNull(jobId);
+	public static List<PermanentBlobKey> uploadJarFiles(
+			InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars)
+			throws IOException {
+
+		checkNotNull(jobId);
+
 		if (jars.isEmpty()) {
 			return Collections.emptyList();
 		} else {
-			List<BlobKey> blobKeys = new ArrayList<>();
+			List<PermanentBlobKey> blobKeys = new ArrayList<>();
 
 			try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) {
 				for (final Path jar : jars) {
@@ -623,7 +546,8 @@ public final class BlobClient implements Closeable {
 					FSDataInputStream is = null;
 					try {
 						is = fs.open(jar);
-						final BlobKey key = blobClient.putInputStream(jobId, is, true);
+						final PermanentBlobKey key =
+							(PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
 						blobKeys.add(key);
 					} finally {
 						if (is != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index a89a461..7a73917 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -22,8 +22,12 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.security.MessageDigest;
+import java.util.Arrays;
 
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 
 /**
@@ -38,6 +42,14 @@ final class BlobInputStream extends InputStream {
 	private final InputStream wrappedInputStream;
 
 	/**
+	 * The wrapped output stream from the underlying TCP connection.
+	 *
+	 * <p>This is used to signal the success or failure of the read operation after receiving the
+	 * whole BLOB and verifying the checksum.
+	 */
+	private final OutputStream wrappedOutputStream;
+
+	/**
 	 * The BLOB key if the GET operation has been performed on a content-addressable BLOB, otherwise <code>null<code>.
 	 */
 	private final BlobKey blobKey;
@@ -65,12 +77,17 @@ final class BlobInputStream extends InputStream {
 	 *        the underlying input stream to read from
 	 * @param blobKey
 	 *        the expected BLOB key for content-addressable BLOBs, <code>null</code> for non-content-addressable BLOBs.
+	 * @param wrappedOutputStream
+	 *        the underlying output stream to write the result to
+	 *
 	 * @throws IOException
 	 *         throws if an I/O error occurs while reading the BLOB data from the BLOB server
 	 */
-	BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey) throws IOException {
+	BlobInputStream(
+		final InputStream wrappedInputStream, final BlobKey blobKey, OutputStream wrappedOutputStream) throws IOException {
 		this.wrappedInputStream = wrappedInputStream;
 		this.blobKey = blobKey;
+		this.wrappedOutputStream = wrappedOutputStream;
 		this.bytesToReceive = readLength(wrappedInputStream);
 		if (this.bytesToReceive < 0) {
 			throw new FileNotFoundException();
@@ -106,10 +123,12 @@ final class BlobInputStream extends InputStream {
 		if (this.md != null) {
 			this.md.update((byte) read);
 			if (this.bytesReceived == this.bytesToReceive) {
-				final BlobKey computedKey = new BlobKey(this.md.digest());
-				if (!computedKey.equals(this.blobKey)) {
+				final byte[] computedKey = this.md.digest();
+				if (!Arrays.equals(computedKey, this.blobKey.getHash())) {
+					this.wrappedOutputStream.write(RETURN_ERROR);
 					throw new IOException("Detected data corruption during transfer");
 				}
+				this.wrappedOutputStream.write(RETURN_OKAY);
 			}
 		}
 
@@ -140,10 +159,12 @@ final class BlobInputStream extends InputStream {
 		if (this.md != null) {
 			this.md.update(b, off, read);
 			if (this.bytesReceived == this.bytesToReceive) {
-				final BlobKey computedKey = new BlobKey(this.md.digest());
-				if (!computedKey.equals(this.blobKey)) {
+				final byte[] computedKey = this.md.digest();
+				if (!Arrays.equals(computedKey, this.blobKey.getHash())) {
+					this.wrappedOutputStream.write(RETURN_ERROR);
 					throw new IOException("Detected data corruption during transfer");
 				}
+				this.wrappedOutputStream.write(RETURN_OKAY);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index bd254dd..0aa45e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,38 +29,67 @@ import java.io.Serializable;
 import java.security.MessageDigest;
 import java.util.Arrays;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A BLOB key uniquely identifies a BLOB.
  */
-public final class BlobKey implements Serializable, Comparable<BlobKey> {
+abstract class BlobKey implements Serializable, Comparable<BlobKey> {
 
 	private static final long serialVersionUID = 3847117712521785209L;
 
-	/** Array of hex characters to facilitate fast toString() method. */
-	private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();
-
 	/** Size of the internal BLOB key in bytes. */
 	private static final int SIZE = 20;
 
-	
 	/** The byte buffer storing the actual key data. */
 	private final byte[] key;
 
 	/**
+	 * (Internal) BLOB type - to be reflected by the inheriting sub-class.
+	 */
+	private final BlobType type;
+
+	/**
+	 * BLOB type, i.e. permanent or transient.
+	 */
+	enum BlobType {
+		/**
+		 * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly
+		 * available.
+		 */
+		PERMANENT_BLOB,
+		/**
+		 * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made
+		 * highly available.
+		 */
+		TRANSIENT_BLOB
+	}
+
+	/**
 	 * Constructs a new BLOB key.
+	 *
+	 * @param type
+	 * 		whether the referenced BLOB is permanent or transient
 	 */
-	public BlobKey() {
+	protected BlobKey(BlobType type) {
+		this.type = checkNotNull(type);
 		this.key = new byte[SIZE];
 	}
 
 	/**
 	 * Constructs a new BLOB key from the given byte array.
-	 * 
+	 *
+	 * @param type
+	 * 		whether the referenced BLOB is permanent or transient
 	 * @param key
 	 *        the actual key data
 	 */
-	BlobKey(byte[] key) {
-		if (key.length != SIZE) {
+	protected BlobKey(BlobType type, byte[] key) {
+		this.type = checkNotNull(type);
+
+		if (key == null || key.length != SIZE) {
 			throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes");
 		}
 
@@ -65,6 +97,50 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 	}
 
 	/**
+	 * Returns the right {@link BlobKey} subclass for the given parameters.
+	 *
+	 * @param type
+	 * 		whether the referenced BLOB is permanent or transient
+	 *
+	 * @return BlobKey subclass
+	 */
+	@VisibleForTesting
+	static BlobKey createKey(BlobType type) {
+		if (type == PERMANENT_BLOB) {
+            return new PermanentBlobKey();
+        } else {
+			return new TransientBlobKey();
+        }
+	}
+
+	/**
+	 * Returns the right {@link BlobKey} subclass for the given parameters.
+	 *
+	 * @param type
+	 * 		whether the referenced BLOB is permanent or transient
+	 * @param key
+	 *        the actual key data
+	 *
+	 * @return BlobKey subclass
+	 */
+	static BlobKey createKey(BlobType type, byte[] key) {
+		if (type == PERMANENT_BLOB) {
+            return new PermanentBlobKey(key);
+        } else {
+			return new TransientBlobKey(key);
+        }
+	}
+
+	/**
+	 * Returns the hash component of this key.
+	 *
+	 * @return a 20 bit hash of the contents the key refers to
+	 */
+	byte[] getHash() {
+		return key;
+	}
+
+	/**
 	 * Adds the BLOB key to the given {@link MessageDigest}.
 	 * 
 	 * @param md
@@ -83,30 +159,36 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 
 		final BlobKey bk = (BlobKey) obj;
 
-		return Arrays.equals(this.key, bk.key);
+		return Arrays.equals(this.key, bk.key) && this.type == bk.type;
 	}
 
 	@Override
 	public int hashCode() {
-		return Arrays.hashCode(this.key);
+		int result = Arrays.hashCode(this.key);
+		result = 37 * result + this.type.hashCode();
+		return result;
 	}
 
 	@Override
 	public String toString() {
-		// from http://stackoverflow.com/questions/9655181/convert-from-byte-array-to-hex-string-in-java
-		final char[] hexChars = new char[SIZE * 2];
-		for (int i = 0; i < SIZE; ++i) {
-			int v = this.key[i] & 0xff;
-			hexChars[i * 2] = HEX_ARRAY[v >>> 4];
-			hexChars[i * 2 + 1] = HEX_ARRAY[v & 0x0f];
+		final String typeString;
+		switch (this.type) {
+			case TRANSIENT_BLOB:
+				typeString = "t-";
+				break;
+			case PERMANENT_BLOB:
+				typeString = "p-";
+				break;
+			default:
+				// this actually never happens!
+				throw new IllegalStateException("Invalid BLOB type");
 		}
-
-		return new String(hexChars);
+		return typeString + StringUtils.byteToHexString(this.key);
 	}
 
 	@Override
 	public int compareTo(BlobKey o) {
-	
+		// compare the hashes first
 		final byte[] aarr = this.key;
 		final byte[] barr = o.key;
 		final int len = Math.min(aarr.length, barr.length);
@@ -118,8 +200,13 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 				return a - b;
 			}
 		}
-	
-		return aarr.length - barr.length;
+
+		if (aarr.length == barr.length) {
+			// same hash contents - compare the BLOB types
+			return this.type.compareTo(o.type);
+		} else {
+			return aarr.length - barr.length;
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -138,15 +225,30 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 		final byte[] key = new byte[BlobKey.SIZE];
 
 		int bytesRead = 0;
-		while (bytesRead < BlobKey.SIZE) {
-			final int read = inputStream.read(key, bytesRead, BlobKey.SIZE - bytesRead);
+		// read key
+		while (bytesRead < key.length) {
+			final int read = inputStream.read(key, bytesRead, key.length - bytesRead);
 			if (read < 0) {
 				throw new EOFException("Read an incomplete BLOB key");
 			}
 			bytesRead += read;
 		}
+		// read BLOB type
+		final BlobType blobType;
+		{
+			final int read = inputStream.read();
+			if (read < 0) {
+				throw new EOFException("Read an incomplete BLOB type");
+			} else if (read == TRANSIENT_BLOB.ordinal()) {
+				blobType = TRANSIENT_BLOB;
+			} else if (read == PERMANENT_BLOB.ordinal()) {
+				blobType = PERMANENT_BLOB;
+			} else {
+				throw new IOException("Invalid data received for the BLOB type: " + read);
+			}
+		}
 
-		return new BlobKey(key);
+		return createKey(blobType, key);
 	}
 
 	/**
@@ -159,5 +261,6 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> {
 	 */
 	void writeToOutputStream(final OutputStream outputStream) throws IOException {
 		outputStream.write(this.key);
+		outputStream.write(this.type.ordinal());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 836d436..7804dfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,11 +26,13 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
@@ -50,6 +52,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -352,8 +356,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		Thrown if the file retrieval failed.
 	 */
 	@Override
-	public File getFile(BlobKey key) throws IOException {
-		return getFileInternal(null, key, false);
+	public File getFile(TransientBlobKey key) throws IOException {
+		return getFileInternal(null, key);
 	}
 
 	/**
@@ -374,9 +378,9 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		Thrown if the file retrieval failed.
 	 */
 	@Override
-	public File getFile(JobID jobId, BlobKey key) throws IOException {
+	public File getFile(JobID jobId, TransientBlobKey key) throws IOException {
 		checkNotNull(jobId);
-		return getFileInternal(jobId, key, false);
+		return getFileInternal(jobId, key);
 	}
 
 	/**
@@ -399,9 +403,9 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		if any other error occurs when retrieving the file
 	 */
 	@Override
-	public File getHAFile(JobID jobId, BlobKey key) throws IOException {
+	public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
 		checkNotNull(jobId);
-		return getFileInternal(jobId, key, true);
+		return getFileInternal(jobId, key);
 	}
 
 	/**
@@ -415,22 +419,20 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey
 	 * 		blob key associated with the requested file
-	 * @param highlyAvailable
-	 * 		whether to the requested file is highly available (HA)
 	 *
 	 * @return file referring to the local storage location of the BLOB
 	 *
 	 * @throws IOException
 	 * 		Thrown if the file retrieval failed.
 	 */
-	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException {
+	private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
 		checkArgument(blobKey != null, "BLOB key cannot be null.");
 
 		final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
 		readWriteLock.readLock().lock();
 
 		try {
-			getFileInternal(jobId, blobKey, highlyAvailable, localFile);
+			getFileInternal(jobId, blobKey, localFile);
 			return localFile;
 		} finally {
 			readWriteLock.readLock().unlock();
@@ -450,20 +452,18 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
 	 * @param blobKey
 	 * 		blob key associated with the requested file
-	 * @param highlyAvailable
-	 * 		whether to the requested file is highly available (HA)
 	 * @param localFile
 	 *      (local) file where the blob is/should be stored
 	 *
 	 * @throws IOException
 	 * 		Thrown if the file retrieval failed.
 	 */
-	void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException {
+	void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) throws IOException {
 		// assume readWriteLock.readLock() was already locked (cannot really check that)
 
 		if (localFile.exists()) {
 			return;
-		} else if (highlyAvailable) {
+		} else if (blobKey instanceof PermanentBlobKey) {
 			// Try the HA blob store
 			// first we have to release the read lock in order to acquire the write lock
 			readWriteLock.readLock().unlock();
@@ -495,30 +495,30 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	}
 
 	@Override
-	public BlobKey put(byte[] value) throws IOException {
-		return putBuffer(null, value, false);
+	public TransientBlobKey putTransient(byte[] value) throws IOException {
+		return (TransientBlobKey) putBuffer(null, value, TRANSIENT_BLOB);
 	}
 
 	@Override
-	public BlobKey put(JobID jobId, byte[] value) throws IOException {
+	public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException {
 		checkNotNull(jobId);
-		return putBuffer(jobId, value, false);
+		return (TransientBlobKey) putBuffer(jobId, value, TRANSIENT_BLOB);
 	}
 
 	@Override
-	public BlobKey put(InputStream inputStream) throws IOException {
-		return putInputStream(null, inputStream, false);
+	public TransientBlobKey putTransient(InputStream inputStream) throws IOException {
+		return (TransientBlobKey) putInputStream(null, inputStream, TRANSIENT_BLOB);
 	}
 
 	@Override
-	public BlobKey put(JobID jobId, InputStream inputStream) throws IOException {
+	public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException {
 		checkNotNull(jobId);
-		return putInputStream(jobId, inputStream, false);
+		return (TransientBlobKey) putInputStream(jobId, inputStream, TRANSIENT_BLOB);
 	}
 
 	/**
 	 * Uploads the data of the given byte array for the given job to the BLOB server and makes it
-	 * highly available (HA).
+	 * a permanent BLOB.
 	 *
 	 * @param jobId
 	 * 		the ID of the job the BLOB belongs to
@@ -531,14 +531,14 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
 	 * 		store
 	 */
-	public BlobKey putHA(JobID jobId, byte[] value) throws IOException {
+	public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
 		checkNotNull(jobId);
-		return putBuffer(jobId, value, true);
+		return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
 	}
 
 	/**
 	 * Uploads the data from the given input stream for the given job to the BLOB server and makes it
-	 * highly available (HA).
+	 * a permanent BLOB.
 	 *
 	 * @param jobId
 	 * 		ID of the job this blob belongs to
@@ -551,9 +551,9 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a
 	 * 		local file, or uploading it to the HA store
 	 */
-	public BlobKey putHA(JobID jobId, InputStream inputStream) throws IOException {
+	public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
 		checkNotNull(jobId);
-		return putInputStream(jobId, inputStream, true);
+		return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
 	}
 
 	/**
@@ -563,8 +563,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		the ID of the job the BLOB belongs to
 	 * @param value
 	 * 		the buffer to upload
-	 * @param highlyAvailable
-	 * 		whether to make the data highly available (HA)
+	 * @param blobType
+	 * 		whether to make the data permanent or transient
 	 *
 	 * @return the computed BLOB key identifying the BLOB on the server
 	 *
@@ -572,7 +572,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
 	 * 		store
 	 */
-	private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, boolean highlyAvailable)
+	private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
 			throws IOException {
 
 		if (LOG.isDebugEnabled()) {
@@ -586,10 +586,10 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 			md.update(value);
 			fos.write(value);
 
-			blobKey = new BlobKey(md.digest());
+			blobKey = BlobKey.createKey(blobType, md.digest());
 
 			// persist file
-			moveTempFileToStore(incomingFile, jobId, blobKey, highlyAvailable);
+			moveTempFileToStore(incomingFile, jobId, blobKey);
 
 			return blobKey;
 		} finally {
@@ -609,8 +609,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		the ID of the job the BLOB belongs to
 	 * @param inputStream
 	 * 		the input stream to read the data from
-	 * @param highlyAvailable
-	 * 		whether to make the data highly available (HA)
+	 * @param blobType
+	 * 		whether to make the data permanent or transient
 	 *
 	 * @return the computed BLOB key identifying the BLOB on the server
 	 *
@@ -619,7 +619,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		local file, or uploading it to the HA store
 	 */
 	private BlobKey putInputStream(
-			@Nullable JobID jobId, InputStream inputStream, boolean highlyAvailable)
+			@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
 			throws IOException {
 
 		if (LOG.isDebugEnabled()) {
@@ -642,10 +642,10 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 				md.update(buf, 0, bytesRead);
 			}
 
-			blobKey = new BlobKey(md.digest());
+			blobKey = BlobKey.createKey(blobType, md.digest());
 
 			// persist file
-			moveTempFileToStore(incomingFile, jobId, blobKey, highlyAvailable);
+			moveTempFileToStore(incomingFile, jobId, blobKey);
 
 			return blobKey;
 		} finally {
@@ -667,21 +667,18 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * 		ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
 	 * @param blobKey
 	 * 		BLOB key identifying the file
-	 * @param highlyAvailable
-	 * 		whether this file should be stored in the HA store
 	 *
 	 * @throws IOException
 	 * 		thrown if an I/O error occurs while moving the file or uploading it to the HA store
 	 */
 	void moveTempFileToStore(
-			File incomingFile, @Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable)
-			throws IOException {
+			File incomingFile, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
 
 		File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
 
 		BlobUtils.moveTempFileToStore(
 			incomingFile, jobId, blobKey, storageFile, readWriteLock.writeLock(), LOG,
-			highlyAvailable ? blobStore : null);
+			blobKey instanceof PermanentBlobKey ? blobStore : null);
 	}
 
 	/**
@@ -695,7 +692,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 *          <tt>false</tt> otherwise
 	 */
 	@Override
-	public boolean delete(BlobKey key) {
+	public boolean deleteFromCache(TransientBlobKey key) {
 		return deleteInternal(null, key);
 	}
 
@@ -711,7 +708,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 *          <tt>false</tt> otherwise
 	 */
 	@Override
-	public boolean delete(JobID jobId, BlobKey key) {
+	public boolean deleteFromCache(JobID jobId, TransientBlobKey key) {
 		checkNotNull(jobId);
 		return deleteInternal(jobId, key);
 	}
@@ -727,7 +724,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	 * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
 	 *          <tt>false</tt> otherwise
 	 */
-	boolean deleteInternal(@Nullable JobID jobId, BlobKey key) {
+	boolean deleteInternal(@Nullable JobID jobId, TransientBlobKey key) {
 		final File localFile =
 			new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 
@@ -781,14 +778,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 		}
 	}
 
-
 	@Override
-	public PermanentBlobService getPermanentBlobStore() {
+	public PermanentBlobService getPermanentBlobService() {
 		return this;
 	}
 
 	@Override
-	public TransientBlobService getTransientBlobStore() {
+	public TransientBlobService getTransientBlobService() {
 		return this;
 	}
 
@@ -812,7 +808,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
 	}
 
 	/**
-	 * Access to the server socket, for testing
+	 * Access to the server socket, for testing.
 	 */
 	ServerSocket getServerSocket() {
 		return this.serverSocket;

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 28d006a..be62581 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,18 +38,19 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_NO_JOB;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_FOR_JOB_HA;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -65,12 +67,12 @@ class BlobServerConnection extends Thread {
 	/** The BLOB server. */
 	private final BlobServer blobServer;
 
-	/** Read lock to synchronize file accesses */
+	/** Read lock to synchronize file accesses. */
 	private final Lock readLock;
 
 	/**
-	 * Creates a new BLOB connection for a client request
-	 * 
+	 * Creates a new BLOB connection for a client request.
+	 *
 	 * @param clientSocket The socket to read/write data.
 	 * @param blobServer The BLOB server.
 	 */
@@ -114,9 +116,6 @@ class BlobServerConnection extends Thread {
 				case GET_OPERATION:
 					get(inputStream, outputStream, new byte[BUFFER_SIZE]);
 					break;
-				case DELETE_OPERATION:
-					delete(inputStream, outputStream);
-					break;
 				default:
 					throw new IOException("Unknown operation " + operation);
 				}
@@ -150,6 +149,10 @@ class BlobServerConnection extends Thread {
 	/**
 	 * Handles an incoming GET request from a BLOB client.
 	 *
+	 * <p>Transient BLOB files are deleted after a successful read operation by the client. Note
+	 * that we do not enforce atomicity here, i.e. multiple clients reading from the same BLOB may
+	 * still succeed.
+	 *
 	 * @param inputStream
 	 * 		the input stream to read incoming data from
 	 * @param outputStream
@@ -173,7 +176,6 @@ class BlobServerConnection extends Thread {
 		final File blobFile;
 		final JobID jobId;
 		final BlobKey blobKey;
-		final boolean permanentBlob;
 
 		try {
 			// read HEADER contents: job ID, key, HA mode/permanent or transient BLOB
@@ -182,25 +184,21 @@ class BlobServerConnection extends Thread {
 				throw new EOFException("Premature end of GET request");
 			}
 
-			// Receive the
-			if (mode == CONTENT_NO_JOB) {
+			// Receive the jobId and key
+			if (mode == JOB_UNRELATED_CONTENT) {
 				jobId = null;
-				permanentBlob = false;
-			} else if (mode == CONTENT_FOR_JOB_HA) {
+			} else if (mode == JOB_RELATED_CONTENT) {
 				byte[] jidBytes = new byte[JobID.SIZE];
 				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
 				jobId = JobID.fromByteArray(jidBytes);
-				permanentBlob = true;
-			} else if (mode == CONTENT_FOR_JOB) {
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-				jobId = JobID.fromByteArray(jidBytes);
-				permanentBlob = false;
 			} else {
 				throw new IOException("Unknown type of BLOB addressing: " + mode + '.');
 			}
 			blobKey = BlobKey.readFromInputStream(inputStream);
 
+			checkArgument(blobKey instanceof TransientBlobKey || jobId != null,
+				"Invalid BLOB addressing for permanent BLOBs");
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,
 					blobKey, clientSocket.getInetAddress());
@@ -212,7 +210,7 @@ class BlobServerConnection extends Thread {
 			// up to here, an error can give a good message
 		}
 		catch (Throwable t) {
-			LOG.error("GET operation failed", t);
+			LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t);
 			try {
 				writeErrorToStream(outputStream, t);
 			}
@@ -224,56 +222,73 @@ class BlobServerConnection extends Thread {
 			return;
 		}
 
-		readLock.lock();
-
 		try {
-			// copy the file to local store if it does not exist yet
+
+			readLock.lock();
 			try {
-				blobServer.getFileInternal(jobId, blobKey, permanentBlob, blobFile);
+				// copy the file to local store if it does not exist yet
+				try {
+					blobServer.getFileInternal(jobId, blobKey, blobFile);
 
-				// enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
-				if (blobFile.length() > Integer.MAX_VALUE) {
-					throw new IOException("BLOB size exceeds the maximum size (2 GB).");
-				}
+					// enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)
+					if (blobFile.length() > Integer.MAX_VALUE) {
+						throw new IOException("BLOB size exceeds the maximum size (2 GB).");
+					}
 
-				outputStream.write(RETURN_OKAY);
-			} catch (Throwable t) {
-				LOG.error("GET operation failed", t);
-				try {
-					writeErrorToStream(outputStream, t);
+					outputStream.write(RETURN_OKAY);
+				} catch (Throwable t) {
+					LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId,
+						blobKey, clientSocket.getInetAddress(), t);
+					try {
+						writeErrorToStream(outputStream, t);
+					} catch (IOException e) {
+						// since we are in an exception case, it means that we could not send the error
+						// ignore this
+					}
+					clientSocket.close();
+					return;
 				}
-				catch (IOException e) {
-					// since we are in an exception case, it means that we could not send the error
-					// ignore this
+
+				// from here on, we started sending data, so all we can do is close the connection when something happens
+				int blobLen = (int) blobFile.length();
+				writeLength(blobLen, outputStream);
+
+				try (FileInputStream fis = new FileInputStream(blobFile)) {
+					int bytesRemaining = blobLen;
+					while (bytesRemaining > 0) {
+						int read = fis.read(buf);
+						if (read < 0) {
+							throw new IOException("Premature end of BLOB file stream for " +
+								blobFile.getAbsolutePath());
+						}
+						outputStream.write(buf, 0, read);
+						bytesRemaining -= read;
+					}
 				}
-				clientSocket.close();
-				return;
+			} finally {
+				readLock.unlock();
 			}
 
-			// from here on, we started sending data, so all we can do is close the connection when something happens
-			int blobLen = (int) blobFile.length();
-			writeLength(blobLen, outputStream);
-
-			try (FileInputStream fis = new FileInputStream(blobFile)) {
-				int bytesRemaining = blobLen;
-				while (bytesRemaining > 0) {
-					int read = fis.read(buf);
-					if (read < 0) {
-						throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath());
-					}
-					outputStream.write(buf, 0, read);
-					bytesRemaining -= read;
+			// on successful transfer, delete transient files
+			int result = inputStream.read();
+			if (result < 0) {
+				throw new EOFException("Premature end of GET request");
+			} else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) {
+				// ignore the result from the operation
+				if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) {
+					LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId,
+						blobKey, clientSocket.getInetAddress());
 				}
 			}
+
 		} catch (SocketException e) {
 			// happens when the other side disconnects
 			LOG.debug("Socket connection closed", e);
 		} catch (Throwable t) {
 			LOG.error("GET operation failed", t);
 			clientSocket.close();
-		} finally {
-			readLock.unlock();
 		}
+
 	}
 
 	/**
@@ -300,33 +315,40 @@ class BlobServerConnection extends Thread {
 			}
 
 			final JobID jobId;
-			final boolean permanentBlob;
-			if (mode == CONTENT_NO_JOB) {
+			if (mode == JOB_UNRELATED_CONTENT) {
 				jobId = null;
-				permanentBlob = false;
-			} else if (mode == CONTENT_FOR_JOB_HA) {
+			} else if (mode == JOB_RELATED_CONTENT) {
 				byte[] jidBytes = new byte[JobID.SIZE];
 				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
 				jobId = JobID.fromByteArray(jidBytes);
-				permanentBlob = true;
-			} else if (mode == CONTENT_FOR_JOB) {
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-				jobId = JobID.fromByteArray(jidBytes);
-				permanentBlob = false;
 			} else {
 				throw new IOException("Unknown type of BLOB addressing.");
 			}
 
+			final BlobKey.BlobType blobType;
+			{
+				final int read = inputStream.read();
+				if (read < 0) {
+					throw new EOFException("Read an incomplete BLOB type");
+				} else if (read == TRANSIENT_BLOB.ordinal()) {
+					blobType = TRANSIENT_BLOB;
+				} else if (read == PERMANENT_BLOB.ordinal()) {
+					blobType = PERMANENT_BLOB;
+					checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs");
+				} else {
+					throw new IOException("Invalid data received for the BLOB type: " + read);
+				}
+			}
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,
 					clientSocket.getInetAddress());
 			}
 
 			incomingFile = blobServer.createTemporaryFilename();
-			BlobKey blobKey = readFileFully(inputStream, incomingFile, buf);
+			BlobKey blobKey = readFileFully(inputStream, incomingFile, buf, blobType);
 
-			blobServer.moveTempFileToStore(incomingFile, jobId, blobKey, permanentBlob);
+			blobServer.moveTempFileToStore(incomingFile, jobId, blobKey);
 
 			// Return computed key to client for validation
 			outputStream.write(RETURN_OKAY);
@@ -365,6 +387,8 @@ class BlobServerConnection extends Thread {
 	 * 		file to write to
 	 * @param buf
 	 * 		An auxiliary buffer for data serialization/deserialization
+	 * @param blobType
+	 * 		whether to make the data permanent or transient
 	 *
 	 * @return the received file's content hash as a BLOB key
 	 *
@@ -372,7 +396,7 @@ class BlobServerConnection extends Thread {
 	 * 		thrown if an I/O error occurs while reading/writing data from/to the respective streams
 	 */
 	private static BlobKey readFileFully(
-			final InputStream inputStream, final File incomingFile, final byte[] buf)
+			final InputStream inputStream, final File incomingFile, final byte[] buf, BlobKey.BlobType blobType)
 			throws IOException {
 		MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -393,59 +417,7 @@ class BlobServerConnection extends Thread {
 
 				md.update(buf, 0, bytesExpected);
 			}
-			return new BlobKey(md.digest());
-		}
-	}
-
-	/**
-	 * Handles an incoming DELETE request from a BLOB client.
-	 *
-	 * @param inputStream
-	 * 		The input stream to read the request from.
-	 * @param outputStream
-	 * 		The output stream to write the response to.
-	 *
-	 * @throws IOException
-	 * 		Thrown if an I/O error occurs while reading the request data from the input stream.
-	 */
-	private void delete(InputStream inputStream, OutputStream outputStream) throws IOException {
-
-		try {
-			// read HEADER contents: job ID, key, HA mode/permanent or transient BLOB
-			final int mode = inputStream.read();
-			if (mode < 0) {
-				throw new EOFException("Premature end of DELETE request");
-			}
-
-			final JobID jobId;
-			if (mode == CONTENT_NO_JOB) {
-				jobId = null;
-			} else if (mode == CONTENT_FOR_JOB) {
-				byte[] jidBytes = new byte[JobID.SIZE];
-				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
-				jobId = JobID.fromByteArray(jidBytes);
-			} else {
-				throw new IOException("Unknown type of BLOB addressing.");
-			}
-			BlobKey key = BlobKey.readFromInputStream(inputStream);
-
-			if (!blobServer.deleteInternal(jobId, key)) {
-				LOG.error("DELETE operation failed");
-				writeErrorToStream(outputStream, null);
-			} else {
-				outputStream.write(RETURN_OKAY);
-			}
-		}
-		catch (Throwable t) {
-			LOG.error("DELETE operation failed", t);
-			try {
-				writeErrorToStream(outputStream, t);
-			}
-			catch (IOException e) {
-				// since we are in an exception case, it means not much that we could not send the error
-				// ignore this
-			}
-			clientSocket.close();
+			return BlobKey.createKey(blobType, md.digest());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
index 35fca14..5c9c7b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -26,15 +26,20 @@ public class BlobServerProtocol {
 	/** The buffer size in bytes for network transfers. */
 	static final int BUFFER_SIZE = 65536; // 64 K
 
-	/** Internal code to identify a PUT operation. */
+	/**
+	 * Internal code to identify a PUT operation.
+	 *
+	 * <p>Note: previously, there was also <tt>DELETE_OPERATION</tt> (code <tt>2</tt>).
+	 */
 	static final byte PUT_OPERATION = 0;
 
-	/** Internal code to identify a GET operation. */
+	/**
+	 * Internal code to identify a GET operation.
+	 *
+	 * <p>Note: previously, there was also <tt>DELETE_OPERATION</tt> (code <tt>2</tt>).
+	 */
 	static final byte GET_OPERATION = 1;
 
-	/** Internal code to identify a DELETE operation. */
-	static final byte DELETE_OPERATION = 2;
-
 	/** Internal code to identify a successful operation. */
 	static final byte RETURN_OKAY = 0;
 
@@ -42,22 +47,18 @@ public class BlobServerProtocol {
 	static final byte RETURN_ERROR = 1;
 
 	/**
-	 * Internal code to identify a job-unrelated transient BLOB.
-	 * <p>
-	 * Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>) and
-	 * <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
-	 */
-	static final byte CONTENT_NO_JOB = 0;
-
-	/**
-	 * Internal code to identify a job-related transient BLOB.
+	 * Internal code to identify a job-unrelated BLOBs (only for transient BLOBs!).
+	 *
+	 * <p>Note: previously, there was also <tt>NAME_ADDRESSABLE</tt> (code <tt>1</tt>).
 	 */
-	static final byte CONTENT_FOR_JOB = 3;
+	static final byte JOB_UNRELATED_CONTENT = 0;
 
 	/**
-	 * Internal code to identify a job-related permanent BLOB.
+	 * Internal code to identify a job-related (permanent or transient) BLOBs.
+	 *
+	 * <p>Note: This is equal to the previous <tt>JOB_ID_SCOPE</tt> (code <tt>2</tt>).
 	 */
-	static final byte CONTENT_FOR_JOB_HA = 4;
+	static final byte JOB_RELATED_CONTENT = 2;
 
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index b643343..174499a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -30,14 +30,14 @@ public interface BlobService extends Closeable {
 	 *
 	 * @return BLOB service
 	 */
-	PermanentBlobService getPermanentBlobStore();
+	PermanentBlobService getPermanentBlobService();
 
 	/**
 	 * Returns a BLOB service for accessing transient BLOBs.
 	 *
 	 * @return BLOB service
 	 */
-	TransientBlobService getTransientBlobStore();
+	TransientBlobService getTransientBlobService();
 
 	/**
 	 * Returns the port of the BLOB server that this BLOB service is working with.

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index ebae4f4..d8223c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -462,7 +462,7 @@ public class BlobUtils {
 						"BlobServer use the same storage directory.");
 					// we cannot be sure at this point whether the file has already been uploaded to the blob
 					// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
-					// persist the blob. Otherwise we might not be able to recover the job.
+					// to persist the blob. Otherwise we might not be able to recover the job.
 				}
 
 				if (blobStore != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/84a07a34/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 1f9af03..4fed4cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.MessageDigest;
+import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -113,8 +114,8 @@ public class FileSystemBlobStore implements BlobStoreService {
 			}
 
 			// verify that file contents are correct
-			final BlobKey computedKey = new BlobKey(md.digest());
-			if (!computedKey.equals(blobKey)) {
+			final byte[] computedKey = md.digest();
+			if (!Arrays.equals(computedKey, blobKey.getHash())) {
 				throw new IOException("Detected data corruption during transfer");
 			}
 


Mime
View raw message