flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/4] Integration of new BLOB service.
Date Mon, 06 Oct 2014 14:31:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 854708c..5e95d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -49,6 +49,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode;
 import org.apache.flink.runtime.client.JobCancelResult;
@@ -56,7 +57,7 @@ import org.apache.flink.runtime.client.JobProgressResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
@@ -111,7 +112,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	
 	/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
-	private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
+	private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware
+			.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
 	
 
 	/** The RPC end point through which the JobManager gets its calls */
@@ -125,8 +127,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	/** The currently running jobs */
 	private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
-	
-	
+
 	// begin: these will be consolidated / removed 
 	private final EventCollector eventCollector;
 	
@@ -136,13 +137,14 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	private final int recommendedClientPollingInterval;
 	// end: these will be consolidated / removed
-	
-	
+
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
 	
 	private volatile boolean isShutDown;
 	
 	private WebInfoServer server;
+
+	private BlobLibraryCacheManager libraryCacheManager;
 	
 	
 	// --------------------------------------------------------------------------------------------
@@ -171,6 +173,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 
 		// Load the job progress collector
 		this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
+
+		this.libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(),
+				GlobalConfiguration.getConfiguration());
 		
 		// Register simple job archive
 		int archived_items = GlobalConfiguration.getInteger(
@@ -250,6 +255,15 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			this.instanceManager.shutdown();
 		}
 
+		// Stop the BLOB server
+		if (this.libraryCacheManager != null) {
+			try {
+				this.libraryCacheManager.shutdown();
+			} catch (IOException e) {
+				LOG.warn("Could not properly shutdown the library cache manager.", e);
+			}
+		}
+
 		// Stop RPC server
 		if (this.jobManagerServer != null) {
 			this.jobManagerServer.stop();
@@ -284,12 +298,14 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		}
 		
 		ExecutionGraph executionGraph = null;
-		boolean success = false;
-		
+
 		try {
 			if (LOG.isInfoEnabled()) {
 				LOG.info(String.format("Received job %s (%s)", job.getJobID(), job.getName()));
 			}
+
+			// Register this job with the library cache manager
+			libraryCacheManager.register(job.getJobID(), job.getUserJarBlobKeys());
 			
 			// get the existing execution graph (if we attach), or construct a new empty one to attach
 			executionGraph = this.currentJobs.get(job.getJobID());
@@ -298,7 +314,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 					LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
 				}
 				
-				executionGraph = new ExecutionGraph(job.getJobID(), job.getName(), job.getJobConfiguration(), this.executorService);
+				executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
+						job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
 				ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
 				if (previous != null) {
 					throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());
@@ -309,41 +326,32 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 					LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", job.getJobID()));
 				}
 			}
+
+			// Register for updates on the job status
+			executionGraph.registerJobStatusListener(this);
 			
 			// grab the class loader for user-defined code
-			final ClassLoader userCodeLoader = LibraryCacheManager.getClassLoader(job.getJobID());
+			final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID());
 			if (userCodeLoader == null) {
 				throw new JobException("The user code class loader could not be initialized.");
 			}
-			
-			String[] jarFilesForJob = LibraryCacheManager.getRequiredJarFiles(job.getJobID());
-			for (String fileId : jarFilesForJob) {
-				executionGraph.addUserCodeJarFile(fileId);
-			}
-			
+
 			// first, perform the master initialization of the nodes
 			if (LOG.isDebugEnabled()) {
 				LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));
 			}
-			try {
-				for (AbstractJobVertex vertex : job.getVertices()) {
-					// check that the vertex has an executable class
-					String executableClass = vertex.getInvokableClassName();
-					if (executableClass == null || executableClass.length() == 0) {
-						throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
-					}
-					
-					// master side initialization
-					vertex.initializeOnMaster(userCodeLoader);
+
+			for (AbstractJobVertex vertex : job.getVertices()) {
+				// check that the vertex has an executable class
+				String executableClass = vertex.getInvokableClassName();
+				if (executableClass == null || executableClass.length() == 0) {
+					throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
 				}
+
+				// master side initialization
+				vertex.initializeOnMaster(userCodeLoader);
 			}
-			catch (FileNotFoundException e) {
-				String message = "File-not-Found: " + e.getMessage();
-				LOG.error(message);
-				executionGraph.fail(e);
-				return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage());
-			}
-			
+
 			// first topologically sort the job vertices to form the basis of creating the execution graph
 			List<AbstractJobVertex> topoSorted = job.getVerticesSortedTopologicallyFromSources();
 			
@@ -366,9 +374,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 				this.eventCollector.registerJob(executionGraph, false, System.currentTimeMillis());
 			}
 	
-			// Register for updates on the job status
-			executionGraph.registerJobStatusListener(this);
-	
 			// Schedule job
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Scheduling job " + job.getName());
@@ -376,41 +381,27 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 			executionGraph.scheduleForExecution(this.scheduler);
 	
-			// Return on success
-			success = true;
 			return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
 		}
 		catch (Throwable t) {
 			LOG.error("Job submission failed.", t);
-			executionGraph.fail(t);
-			return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
-		}
-		finally {
-			if (!success) {
-				if (executionGraph != null) {
-					if (executionGraph.getState() != JobStatus.FAILING && executionGraph.getState() != JobStatus.FAILED) {
-						executionGraph.fail(new Exception("Could not set up and start execution graph on JobManager"));
-					}
-					try {
-						executionGraph.waitForJobEnd(10000);
-					} catch (InterruptedException e) {
-						LOG.error("Interrupted while waiting for job to finish canceling.");
-					}
-				}
-				
-				this.currentJobs.remove(job.getJobID());
-				
+			if(executionGraph != null){
+				executionGraph.fail(t);
+
 				try {
-					LibraryCacheManager.unregister(job.getJobID());
-				}
-				catch (IllegalStateException e) {
-					// may happen if the job failed before being registered at the
-					// library cache manager
-				}
-				catch (Throwable t) {
-					LOG.error("Error while de-registering job at library cache manager.", t);
+					executionGraph.waitForJobEnd(10000);
+				}catch(InterruptedException e){
+					LOG.error("Interrupted while waiting for job to finish canceling.");
 				}
 			}
+
+			// job was not prperly removed by the fail call
+			if(currentJobs.contains(job.getJobID())){
+				currentJobs.remove(job.getJobID());
+				libraryCacheManager.unregister(job.getJobID());
+			}
+
+			return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
 		}
 	}
 
@@ -495,7 +486,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			this.currentJobs.remove(jid);
 			
 			try {
-				LibraryCacheManager.unregister(jid);
+				libraryCacheManager.unregister(jid);
 			}
 			catch (Throwable t) {
 				LOG.warn("Could not properly unregister job " + jid + " from the library cache.");
@@ -631,7 +622,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 
 	@Override
 	public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException {
-		this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(), accumulatorEvent.getAccumulators(LibraryCacheManager.getClassLoader(accumulatorEvent.getJobID())));
+		this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(),
+				accumulatorEvent.getAccumulators(libraryCacheManager.getClassLoader(accumulatorEvent.getJobID()
+				)));
 	}
 
 	@Override
@@ -766,4 +759,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		GlobalConfiguration.includeConfiguration(infoserverConfig);
 		return jobManager;
 	}
+
+	@Override
+	public int getBlobServerPort() {
+		return libraryCacheManager.getBlobServerPort();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 2d7afc8..1a378b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -19,8 +19,6 @@
 
 package org.apache.flink.runtime.operators;
 
-import java.io.IOException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -30,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
@@ -46,7 +43,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
- * DataSinkTask which is executed by a Nephele task manager.
+ * DataSinkTask which is executed by a Flink task manager.
  * The task hands the data to an output format.
  * 
  * @see OutputFormat
@@ -78,9 +75,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 	// task configuration
 	private TaskConfig config;
 	
-	// class loader for user code
-	private ClassLoader userCodeClassLoader;
-
 	// cancel flag
 	private volatile boolean taskCanceled;
 	
@@ -128,7 +122,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 				// initialize sort local strategy
 				try {
 					// get type comparator
-					TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0, this.userCodeClassLoader);
+					TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0,
+							getUserCodeClassLoader());
 					if (compFact == null) {
 						throw new Exception("Missing comparator factory for local strategy on input " + 0);
 					}
@@ -250,15 +245,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 	}
 	
 	/**
-	 * Sets the class-loader to be used to load the user code.
-	 * 
-	 * @param cl The class-loader to be used to load the user code.
-	 */
-	public void setUserCodeClassLoader(ClassLoader cl) {
-		this.userCodeClassLoader = cl;
-	}
-
-	/**
 	 * Initializes the OutputFormat implementation and configuration.
 	 * 
 	 * @throws RuntimeException
@@ -266,19 +252,13 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 	 *         obtained.
 	 */
 	private void initOutputFormat() {
-		if (this.userCodeClassLoader == null) {
-			try {
-				this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
-			} catch (IOException ioe) {
-				throw new RuntimeException("Library cache manager could not be instantiated.", ioe);
-			}
-		}
+		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 		// obtain task configuration (including stub parameters)
 		Configuration taskConf = getTaskConfiguration();
 		this.config = new TaskConfig(taskConf);
 
 		try {
-			this.format = config.<OutputFormat<IT>>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
+			this.format = config.<OutputFormat<IT>>getStubWrapper(userCodeClassLoader).getUserCodeObject(OutputFormat.class, userCodeClassLoader);
 
 			// check if the class is a subclass, if the check is required
 			if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
@@ -331,7 +311,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 			throw new Exception("Illegal input group size in task configuration: " + groupSize);
 		}
 		
-		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
+		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		
 		if (this.inputTypeSerializerFactory.getDataType() == Record.class) {
 			// record specific deserialization

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index af3eff3..bfd2507 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.runtime.operators;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -35,7 +34,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -76,8 +74,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 	// tasks chained to this data source
 	private ArrayList<ChainedDriver<?, ?>> chainedTasks;
 	
-	private ClassLoader userCodeClassLoader;
-
 	// cancel flag
 	private volatile boolean taskCanceled = false;
 
@@ -91,7 +87,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		}
 
 		try {
-			initOutputs(this.userCodeClassLoader);
+			initOutputs(getUserCodeClassLoader());
 		} catch (Exception ex) {
 			throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + 
 				ex.getMessage(), ex);
@@ -282,15 +278,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 	}
 	
 	/**
-	 * Sets the class-loader to be used to load the user code.
-	 * 
-	 * @param cl The class-loader to be used to load the user code.
-	 */
-	public void setUserCodeClassLoader(ClassLoader cl) {
-		this.userCodeClassLoader = cl;
-	}
-
-	/**
 	 * Initializes the InputFormat implementation and configuration.
 l	 * 
 	 * @throws RuntimeException
@@ -298,23 +285,14 @@ l	 *
 	 *         obtained.
 	 */
 	private void initInputFormat() {
-		if (this.userCodeClassLoader == null) {
-			try {
-				this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
-			}
-			catch (IOException ioe) {
-				throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
-						getEnvironment().getJobID(), ioe);
-			}
-		}
-
+		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 		// obtain task configuration (including stub parameters)
 		Configuration taskConf = getTaskConfiguration();
 		this.config = new TaskConfig(taskConf);
 
 		try {
-			this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader)
-					.getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
+			this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(userCodeClassLoader)
+					.getUserCodeObject(InputFormat.class, userCodeClassLoader);
 
 			// check if the class is a subclass, if the check is required
 			if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
@@ -336,7 +314,7 @@ l	 *
 		}
 		
 		// get the factory for the type serializer
-		this.serializerFactory = this.config.getOutputSerializer(this.userCodeClassLoader);
+		this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index c3a2d59..f6d46fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -30,7 +30,7 @@ import org.apache.flink.util.MutableObjectIterator;
 
 
 /**
- * A runtime task is the task that is executed by the nephele engine inside a task vertex.
+ * A runtime task is the task that is executed by the flink engine inside a task vertex.
  * It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
  * deals with the runtime setup and teardown and the control-flow logic. The later appears especially
  * in the case of iterations.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 47ae012..35908a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -37,7 +37,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.accumulators.AccumulatorEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.io.network.api.ChannelSelector;
@@ -180,11 +179,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	protected TaskConfig config;
 
 	/**
-	 * The class loader used to instantiate user code and user data types.
-	 */
-	protected ClassLoader userCodeClassLoader;
-
-	/**
 	 * A list of chained drivers, if there are any.
 	 */
 	protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
@@ -231,16 +225,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			LOG.debug(formatLogString("Start registering input and output."));
 		}
 
-		// get the classloader first. the classloader might have been set before by mock environments during testing
-		if (this.userCodeClassLoader == null) {
-			try {
-				this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
-			}
-			catch (IOException ioe) {
-				throw new RuntimeException("The ClassLoader for the user code could not be instantiated from the library cache.", ioe);
-			}
-		}
-
 		// obtain task configuration (including stub parameters)
 		Configuration taskConf = getTaskConfiguration();
 		this.config = new TaskConfig(taskConf);
@@ -407,16 +391,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 	}
 
-
-	/**
-	 * Sets the class-loader to be used to load the user code.
-	 *
-	 * @param cl The class-loader to be used to load the user code.
-	 */
-	public void setUserCodeClassLoader(ClassLoader cl) {
-		this.userCodeClassLoader = cl;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//                                  Main Work Methods
 	// --------------------------------------------------------------------------------------------
@@ -694,7 +668,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 	protected S initStub(Class<? super S> stubSuperClass) throws Exception {
 		try {
-			S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
+			ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+			S stub = config.<S>getStubWrapper(userCodeClassLoader).getUserCodeObject(stubSuperClass, userCodeClassLoader);
 			// check if the class is a subclass, if the check is required
 			if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
 				throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + 
@@ -785,11 +760,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
 		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
 		this.inputIterators = new MutableObjectIterator[numInputs];
+
+		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 		
-		//  ---------------- create the input serializers  ---------------------
 		for (int i = 0; i < numInputs; i++) {
 			
-			final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
+			final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, userCodeClassLoader);
 			this.inputSerializers[i] = serializerFactory;
 			
 			this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
@@ -799,7 +775,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		for (int i = 0; i < numComparators; i++) {
 			
 			if (this.inputComparators != null) {
-				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
+				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, userCodeClassLoader);
 				this.inputComparators[i] = comparatorFactory.createComparator();
 			}
 		}
@@ -812,9 +788,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
 		this.broadcastInputIterators = new MutableObjectIterator[numBroadcastInputs];
 
+		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
 		for (int i = 0; i < numBroadcastInputs; i++) {
 			//  ---------------- create the serializer first ---------------------
-			final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, this.userCodeClassLoader);
+			final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
 			this.broadcastInputSerializers[i] = serializerFactory;
 
 			this.broadcastInputIterators[i] = createInputIterator(this.broadcastInputReaders[i], this.broadcastInputSerializers[i]);
@@ -1025,7 +1003,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 
 	private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
-		TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader);
+		TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, getUserCodeClassLoader());
 		if (compFact == null) {
 			throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
 		}
@@ -1060,12 +1038,16 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	protected void initOutputs() throws Exception {
 		this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
 		this.eventualOutputs = new ArrayList<BufferWriter>();
-		this.output = initOutputs(this, this.userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs);
+
+		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
+		this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs);
 	}
 
 	public RuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
-		return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask());
+		return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(),
+				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), env.getCopyTask());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1078,11 +1060,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 
 	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return this.userCodeClassLoader;
-	}
-
-	@Override
 	public MemoryManager getMemoryManager() {
 		return getEnvironment().getMemoryManager();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagementProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagementProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagementProtocol.java
index 8cc9d68..db77083 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagementProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagementProtocol.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.protocols;
 
 import java.io.IOException;
 
-import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.client.JobCancelResult;
 import org.apache.flink.runtime.client.JobProgressResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
@@ -31,7 +30,7 @@ import org.apache.flink.runtime.types.IntegerRecord;
 /**
  * The JobManagementProtocol specifies methods required to manage jobs from a job client.
  */
-public interface JobManagementProtocol extends VersionedProtocol {
+public interface JobManagementProtocol extends ServiceDiscoveryProtocol {
 
 	/**
 	 * Submits the specified job to the job manager.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
index 6391a27..ca02459 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/JobManagerProtocol.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.protocols;
 
 import java.io.IOException;
 
-import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -31,7 +30,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
  * to task managers which allows them to register themselves, send heart beat messages
  * or to report the results of a task execution.
  */
-public interface JobManagerProtocol extends VersionedProtocol {
+public interface JobManagerProtocol extends ServiceDiscoveryProtocol {
 
 	/**
 	 * Sends a heart beat to the job manager.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ServiceDiscoveryProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ServiceDiscoveryProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ServiceDiscoveryProtocol.java
new file mode 100644
index 0000000..ef0427f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/ServiceDiscoveryProtocol.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.protocols;
+
+import org.apache.flink.core.protocols.VersionedProtocol;
+
+/**
+ * The service discovery protocols enables different components of the Flink distributed runtime to query and discover
+ * the network location auxiliary services.
+ */
+public interface ServiceDiscoveryProtocol extends VersionedProtocol {
+
+	/**
+	 * Returns the network port of the job manager's BLOB server.
+	 * 
+	 * @return the port of the job manager's BLOB server or <code>-1</code> if the job manager does not run a BLOB
+	 *         server
+	 */
+	int getBlobServerPort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
index 56c233b..7e39047 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
@@ -22,9 +22,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
 
@@ -39,7 +36,5 @@ public interface TaskOperationProtocol extends VersionedProtocol {
 
 	TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException;
 
-	LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException;
 	
-	void updateLibraryCache(LibraryCacheUpdate update) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index ae39d04..44327e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -59,13 +59,13 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.Hardware;
@@ -138,6 +138,7 @@ public class TaskManager implements TaskOperationProtocol {
 	
 	private final AccumulatorProtocol accumulatorProtocolProxy;
 
+	private final LibraryCacheManager libraryCacheManager;
 	
 	private final Server taskManagerServer;
 
@@ -333,6 +334,22 @@ public class TaskManager implements TaskOperationProtocol {
 		
 		this.hardwareDescription = HardwareDescription.extractFromSystem(this.memoryManager.getMemorySize());
 
+		// Determine the port of the BLOB server and register it with the library cache manager
+		{
+			final int blobPort = this.jobManager.getBlobServerPort();
+
+			if (blobPort == -1) {
+				LOG.warn("Unable to determine BLOB server address: User library download will not be available");
+				this.libraryCacheManager = new FallbackLibraryCacheManager();
+			} else {
+				final InetSocketAddress blobServerAddress = new InetSocketAddress(
+					jobManagerAddress.getAddress(), blobPort);
+				LOG.info("Determined BLOB server address to be " + blobServerAddress);
+
+				this.libraryCacheManager = new BlobLibraryCacheManager(new BlobCache
+						(blobServerAddress), GlobalConfiguration.getConfiguration());
+			}
+		}
 		this.ioManager = new IOManager(tmpDirPaths);
 		
 		// start the heart beats
@@ -447,6 +464,14 @@ public class TaskManager implements TaskOperationProtocol {
 			this.memoryManager.shutdown();
 		}
 
+		if(libraryCacheManager != null){
+			try {
+				this.libraryCacheManager.shutdown();
+			} catch (IOException e) {
+				LOG.warn("Could not properly shutdown the library cache manager.", e);
+			}
+		}
+
 		this.fileCache.shutdown();
 
 		// Shut down the executor service
@@ -548,11 +573,13 @@ public class TaskManager implements TaskOperationProtocol {
 		boolean jarsRegistered = false;
 		
 		try {
+			// Now register data with the library manager
+			libraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
+
 			// library and classloader issues first
-			LibraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
 			jarsRegistered = true;
-			
-			final ClassLoader userCodeClassLoader = LibraryCacheManager.getClassLoader(jobID);
+
+			final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID);
 			if (userCodeClassLoader == null) {
 				throw new Exception("No user code ClassLoader available.");
 			}
@@ -619,13 +646,7 @@ public class TaskManager implements TaskOperationProtocol {
 			LOG.error("Could not instantiate task", t);
 			
 			if (jarsRegistered) {
-				try {
-					LibraryCacheManager.unregister(jobID);
-				} catch (IOException e) {
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Unregistering the execution " + executionId + " caused an IOException");
-					}
-				}
+				libraryCacheManager.unregister(jobID);
 			}
 			
 			return new TaskOperationResult(executionId, false, ExceptionUtils.stringifyException(t));
@@ -664,14 +685,7 @@ public class TaskManager implements TaskOperationProtocol {
 		task.unregisterMemoryManager(this.memoryManager);
 
 		// Unregister task from library cache manager
-		try {
-			LibraryCacheManager.unregister(task.getJobID());
-		}
-		catch (Throwable t) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Unregistering the cached libraries caused an exception: ",  t);
-			}
-		}
+		libraryCacheManager.unregister(task.getJobID());
 	}
 
 	public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable optionalError) {
@@ -710,34 +724,7 @@ public class TaskManager implements TaskOperationProtocol {
 			}
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Library caching
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public LibraryCacheProfileResponse getLibraryCacheProfile(LibraryCacheProfileRequest request) throws IOException {
-
-		LibraryCacheProfileResponse response = new LibraryCacheProfileResponse(request);
-		String[] requiredLibraries = request.getRequiredLibraries();
 
-		for (int i = 0; i < requiredLibraries.length; i++) {
-			if (LibraryCacheManager.contains(requiredLibraries[i]) == null) {
-				response.setCached(i, false);
-			} else {
-				response.setCached(i, true);
-			}
-		}
-
-		return response;
-	}
-	
-	@Override
-	public void updateLibraryCache(LibraryCacheUpdate update) throws IOException {
-		// Nothing to to here, because the libraries are added to the cache when the
-		// update is deserialized (WE SHOULD CHANGE THAT!!!)
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	//  Heartbeats
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
new file mode 100644
index 0000000..d456135
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link BlobCache}.
+ */
+public class BlobCacheTest {
+
+	@Test
+	public void testBlobCache() {
+
+		// First create two BLOBs and upload them to BLOB server
+		final byte[] buf = new byte[128];
+		final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
+
+		BlobServer blobServer = null;
+		BlobCache blobCache = null;
+		try {
+
+			// Start the BLOB server
+			blobServer = new BlobServer();
+			final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getServerPort());
+
+			// Upload BLOBs
+			BlobClient blobClient = null;
+			try {
+
+				blobClient = new BlobClient(serverAddress);
+
+				blobKeys.add(blobClient.put(buf));
+				buf[0] = 1; // Make sure the BLOB key changes
+				blobKeys.add(blobClient.put(buf));
+			} finally {
+				if (blobClient != null) {
+					blobClient.close();
+				}
+			}
+
+			blobCache = new BlobCache(serverAddress);
+
+			for(int i = 0; i < blobKeys.size(); i++){
+				blobCache.getURL(blobKeys.get(i));
+			}
+
+			// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
+			blobServer.shutdown();
+			blobServer = null;
+
+			final URL[] urls = new URL[blobKeys.size()];
+
+			for(int i = 0; i < blobKeys.size(); i++){
+				urls[i] = blobCache.getURL(blobKeys.get(i));
+			}
+
+			// Verify the result
+			assertEquals(blobKeys.size(), urls.length);
+
+			for (int i = 0; i < urls.length; ++i) {
+
+				final URL url = urls[i];
+
+				assertNotNull(url);
+
+				try {
+					final File cachedFile = new File(url.toURI());
+
+					assertTrue(cachedFile.exists());
+					assertEquals(buf.length, cachedFile.length());
+
+				} catch (URISyntaxException e) {
+					fail(e.getMessage());
+				}
+
+			}
+
+		} catch (IOException ioe) {
+			fail(ioe.getMessage());
+		} finally {
+			if (blobServer != null) {
+				try {
+					blobServer.shutdown();
+				} catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+
+			if(blobCache != null){
+				try {
+					blobCache.shutdown();
+				} catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
new file mode 100644
index 0000000..2542bbb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.security.MessageDigest;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link BlobClient}.
+ */
+public class BlobClientTest {
+
+	/**
+	 * The buffer size used during the tests in bytes.
+	 */
+	private static final int TEST_BUFFER_SIZE = 17 * 1000;
+
+	/**
+	 * The instance of the BLOB server used during the tests.
+	 */
+	private static BlobServer BLOB_SERVER;
+
+	/**
+	 * Starts the BLOB server.
+	 */
+	@BeforeClass
+	public static void startServer() {
+
+		try {
+			BLOB_SERVER = new BlobServer();
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+		}
+
+	}
+
+	/**
+	 * Shuts the BLOB server down.
+	 */
+	@AfterClass
+	public static void stopServer() {
+
+		if (BLOB_SERVER != null) {
+			try {
+				BLOB_SERVER.shutdown();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	/**
+	 * Creates a test buffer and fills it with a specific byte pattern.
+	 * 
+	 * @return a test buffer filled with a specific byte pattern
+	 */
+	private static byte[] createTestBuffer() {
+
+		final byte[] buf = new byte[TEST_BUFFER_SIZE];
+
+		for (int i = 0; i < buf.length; ++i) {
+			buf[i] = (byte) (i % 128);
+		}
+
+		return buf;
+	}
+
+	/**
+	 * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and
+	 * computes the file's BLOB key.
+	 * 
+	 * @param file
+	 *        the file to prepare for the unit tests
+	 * @return the BLOB key of the prepared file
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while writing to the test file
+	 */
+	private static BlobKey prepareTestFile(final File file) throws IOException {
+
+		MessageDigest md = BlobUtils.createMessageDigest();
+
+		final byte[] buf = new byte[TEST_BUFFER_SIZE];
+		for (int i = 0; i < buf.length; ++i) {
+			buf[i] = (byte) (i % 128);
+		}
+
+		FileOutputStream fos = null;
+		try {
+			fos = new FileOutputStream(file);
+
+			for (int i = 0; i < 20; ++i) {
+				fos.write(buf);
+				md.update(buf);
+			}
+
+		} finally {
+			if (fos != null) {
+				fos.close();
+			}
+		}
+
+		return new BlobKey(md.digest());
+	}
+
+	/**
+	 * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
+	 * the specified buffer.
+	 * 
+	 * @param inputStream
+	 *        the input stream returned from the GET operation
+	 * @param buf
+	 *        the buffer to compare the input stream's data to
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while reading the input stream
+	 */
+	private static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException {
+
+		int bytesReceived = 0;
+
+		while (true) {
+
+			final int read = inputStream.read(buf, bytesReceived, buf.length - bytesReceived);
+			if (read < 0) {
+				throw new EOFException();
+			}
+			bytesReceived += read;
+
+			if (bytesReceived == buf.length) {
+				assertEquals(-1, inputStream.read());
+				return;
+			}
+		}
+	}
+
+	/**
+	 * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of
+	 * the specified file.
+	 * 
+	 * @param inputStream
+	 *        the input stream returned from the GET operation
+	 * @param file
+	 *        the file to compare the input stream's data to
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while reading the input stream or the file
+	 */
+	private static void validateGet(final InputStream inputStream, final File file) throws IOException {
+
+		InputStream inputStream2 = null;
+		try {
+
+			inputStream2 = new FileInputStream(file);
+
+			while (true) {
+
+				final int r1 = inputStream.read();
+				final int r2 = inputStream2.read();
+
+				assertEquals(r2, r1);
+
+				if (r1 < 0) {
+					break;
+				}
+			}
+
+		} finally {
+			if (inputStream2 != null) {
+				inputStream2.close();
+			}
+		}
+
+	}
+
+	/**
+	 * Tests the PUT/GET operations for content-addressable buffers.
+	 */
+	@Test
+	public void testContentAddressableBuffer() {
+
+		final byte[] testBuffer = createTestBuffer();
+		final MessageDigest md = BlobUtils.createMessageDigest();
+		md.update(testBuffer);
+		final BlobKey origKey = new BlobKey(md.digest());
+
+		try {
+
+			BlobClient client = null;
+			try {
+
+				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+				client = new BlobClient(serverAddress);
+
+				// Store the data
+				final BlobKey receivedKey = client.put(testBuffer);
+				assertEquals(origKey, receivedKey);
+
+				// Retrieve the data
+				final InputStream is = client.get(receivedKey);
+				validateGet(is, testBuffer);
+
+				// Check reaction to invalid keys
+				try {
+					client.get(new BlobKey());
+				} catch (FileNotFoundException fnfe) {
+					return;
+				}
+
+				fail("Expected FileNotFoundException did not occur");
+
+			} finally {
+				if (client != null) {
+					client.close();
+				}
+			}
+
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+		}
+	}
+
+	/**
+	 * Tests the PUT/GET operations for content-addressable streams.
+	 */
+	@Test
+	public void testContentAddressableStream() {
+
+		try {
+
+			final File testFile = File.createTempFile("testfile", ".dat");
+			testFile.deleteOnExit();
+			final BlobKey origKey = prepareTestFile(testFile);
+
+			BlobClient client = null;
+			InputStream is = null;
+			try {
+
+				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+				client = new BlobClient(serverAddress);
+
+				// Store the data
+				is = new FileInputStream(testFile);
+				final BlobKey receivedKey = client.put(is);
+				assertEquals(origKey, receivedKey);
+
+				is.close();
+				is = null;
+
+				// Retrieve the data
+				is = client.get(receivedKey);
+				validateGet(is, testFile);
+
+			} finally {
+				if (is != null) {
+					is.close();
+				}
+				if (client != null) {
+					client.close();
+				}
+			}
+
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+		}
+	}
+
+	/**
+	 * Tests the PUT/GET operations for regular (non-content-addressable) buffers.
+	 */
+	@Test
+	public void testRegularBuffer() {
+
+		final byte[] testBuffer = createTestBuffer();
+		final JobID jobID = JobID.generate();
+		final String key = "testkey";
+
+		try {
+
+			BlobClient client = null;
+			try {
+
+				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+				client = new BlobClient(serverAddress);
+
+				// Store the data
+				client.put(jobID, key, testBuffer);
+
+				// Retrieve the data
+				final InputStream is = client.get(jobID, key);
+				validateGet(is, testBuffer);
+
+				// Delete the data
+				client.delete(jobID, key);
+				
+				// Check if the BLOB is still available
+				try {
+					client.get(jobID, key);
+				} catch (FileNotFoundException fnfe) {
+					return;
+				}
+
+				fail("Expected FileNotFoundException did not occur");
+
+			} finally {
+				if (client != null) {
+					client.close();
+				}
+			}
+
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+		}
+	}
+
+	/**
+	 * Tests the PUT/GET operations for regular (non-content-addressable) streams.
+	 */
+	@Test
+	public void testRegularStream() {
+
+		final JobID jobID = JobID.generate();
+		final String key = "testkey3";
+
+		try {
+			final File testFile = File.createTempFile("testfile", ".dat");
+			testFile.deleteOnExit();
+			prepareTestFile(testFile);
+
+			BlobClient client = null;
+			InputStream is = null;
+			try {
+
+				final InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+				client = new BlobClient(serverAddress);
+
+				// Store the data
+				is = new FileInputStream(testFile);
+				client.put(jobID, key, is);
+
+				is.close();
+				is = null;
+
+				// Retrieve the data
+				is = client.get(jobID, key);
+				validateGet(is, testFile);
+
+			} finally {
+				if (is != null) {
+					is.close();
+				}
+				if (client != null) {
+					client.close();
+				}
+			}
+
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
new file mode 100644
index 0000000..879bd37
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.flink.runtime.testutils.ServerTestUtils;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link BlobKey} class.
+ */
+public final class BlobKeyTest {
+	/**
+	 * The first key array to be used during the unit tests.
+	 */
+	private static final byte[] KEY_ARRAY_1 = new byte[20];
+
+	/**
+	 * The second key array to be used during the unit tests.
+	 */
+	private static final byte[] KEY_ARRAY_2 = new byte[20];
+	/**
+	 * Initialize the key array.
+	 */
+	static {
+		for (int i = 0; i < KEY_ARRAY_1.length; ++i) {
+			KEY_ARRAY_1[i] = (byte) i;
+			KEY_ARRAY_2[i] = (byte) (i + 1);
+		}
+	}
+
+	/**
+	 * Tests the serialization/deserialization of BLOB keys using the regular {@link IOReadableWritable} API.
+	 */
+	@Test
+	public void testSerialization() {
+		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+		final BlobKey k2;
+		try {
+			k2 = ServerTestUtils.createCopy(k1);
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+			return;
+		}
+		assertEquals(k1, k2);
+		assertEquals(k1.hashCode(), k2.hashCode());
+		assertEquals(0, k1.compareTo(k2));
+	}
+
+	/**
+	 * Tests the equals method.
+	 */
+	@Test
+	public void testEquals() {
+		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+		final BlobKey k2 = new BlobKey(KEY_ARRAY_1);
+		final BlobKey k3 = new BlobKey(KEY_ARRAY_2);
+		assertTrue(k1.equals(k2));
+		assertFalse(k1.equals(k3));
+	}
+
+	/**
+	 * Tests the compares method.
+	 */
+	@Test
+	public void testCompares() {
+		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+		final BlobKey k2 = new BlobKey(KEY_ARRAY_1);
+		final BlobKey k3 = new BlobKey(KEY_ARRAY_2);
+		assertTrue(k1.compareTo(k2) == 0);
+		assertTrue(k1.compareTo(k3) < 0);
+	}
+
+	/**
+	 * Test the serialization/deserialization using input/output streams.
+	 */
+	@Test
+	public void testStreams() {
+		final BlobKey k1 = new BlobKey(KEY_ARRAY_1);
+		final ByteArrayOutputStream baos = new ByteArrayOutputStream(20);
+		try {
+			k1.writeToOutputStream(baos);
+			baos.close();
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+			return;
+		}
+		final ByteArrayInputStream bais = new ByteArrayInputStream(
+			baos.toByteArray());
+		final BlobKey k2;
+		try {
+			k2 = BlobKey.readFromInputStream(bais);
+		} catch (IOException ioe) {
+			fail(StringUtils.stringifyException(ioe));
+			return;
+		}
+		assertEquals(k1, k2);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index e8ed812..166353f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -20,14 +20,13 @@ package org.apache.flink.runtime.deployment;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 
 public class TaskDeploymentDescriptorTest {
-
 	@Test
 	public void testSerialization() {
 		try {
@@ -52,10 +50,11 @@ public class TaskDeploymentDescriptorTest {
 			final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
 			final List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(0);
 			final List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(0);
+			final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0);
 	
 			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName,
 				indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration,
-				invokableClass.getName(), outputGates, inputGates, new String[] { "jar1", "jar2" }, 47);
+				invokableClass.getName(), outputGates, inputGates, requiredJars, 47);
 	
 			final TaskDeploymentDescriptor copy = CommonTestUtils.createCopyWritable(orig);
 	
@@ -72,8 +71,8 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getCurrentNumberOfSubtasks(), copy.getCurrentNumberOfSubtasks());
 			assertEquals(orig.getOutputGates(), copy.getOutputGates());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
-			
-			assertTrue(Arrays.equals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles()));
+
+			assertEquals(orig.getRequiredJarFiles(), copy.getRequiredJarFiles());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
new file mode 100644
index 0000000..606fff1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BlobLibraryCacheManagerTest {
+
+	@Test
+	public void testLibraryCacheManagerCleanup(){
+		Configuration config = new Configuration();
+
+		config.setLong(ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, 1);
+		GlobalConfiguration.includeConfiguration(config);
+
+		JobID jid = new JobID();
+		List<BlobKey> keys = new ArrayList<BlobKey>();
+		BlobServer server = null;
+		LibraryCacheManager libraryCacheManager = null;
+
+		final byte[] buf = new byte[128];
+
+		try {
+			server = new BlobServer();
+			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getServerPort());
+			BlobClient bc = new BlobClient(blobSocketAddress);
+
+			keys.add(bc.put(buf));
+			buf[0] += 1;
+			keys.add(bc.put(buf));
+
+			libraryCacheManager = new BlobLibraryCacheManager(server, GlobalConfiguration.getConfiguration());
+			libraryCacheManager.register(jid, keys);
+
+			List<File> files = new ArrayList<File>();
+
+			for(BlobKey key: keys){
+				files.add(libraryCacheManager.getFile(key));
+			}
+
+			assertEquals(2, files.size());
+			files.clear();
+
+			libraryCacheManager.unregister(jid);
+
+			Thread.sleep(1500);
+
+			int caughtExceptions = 0;
+
+			for (BlobKey key : keys) {
+				// the blob cache should no longer contain the files
+				try {
+					files.add(libraryCacheManager.getFile(key));
+				} catch (IOException ioe) {
+					caughtExceptions++;
+				}
+			}
+
+			assertEquals(2, caughtExceptions);
+		}catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}finally{
+			if(server != null){
+				try {
+					server.shutdown();
+				} catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+
+			if(libraryCacheManager != null){
+				try {
+					libraryCacheManager.shutdown();
+				} catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 963f8c9..74ab08b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,15 +27,17 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.doAnswer;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -121,7 +123,6 @@ public class ExecutionGraphDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			LibraryCacheManager.register(jobId, new String[0]);
 			vertex.deployToSlot(slot);
 			
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
@@ -264,7 +265,6 @@ public class ExecutionGraphDeploymentTest {
 		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
 		
 		// schedule, this triggers mock deployment
-		LibraryCacheManager.register(jobId, new String[0]);
 		eg.scheduleForExecution(scheduler);
 		
 		Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 4db84ec..f5a4d39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -22,10 +22,12 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -54,8 +56,6 @@ public class ExecutionStateProgressTest {
 			
 			ExecutionJobVertex ejv = graph.getJobVertex(vid);
 			
-			LibraryCacheManager.register(jid, new String[0]);
-			
 			// mock resources and mock taskmanager
 			TaskOperationProtocol taskManager = getSimpleAcknowledgingTaskmanager();
 			for (ExecutionVertex ee : ejv.getTaskVertices()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 31d7542..1f74ae3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,10 +26,12 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -123,7 +125,6 @@ public class ExecutionVertexCancelTest {
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
-			LibraryCacheManager.register(ejv.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
@@ -197,7 +198,6 @@ public class ExecutionVertexCancelTest {
 			Instance instance = getInstance(taskManager);
 			AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
-			LibraryCacheManager.register(ejv.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 2f44981..efb2af4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,9 +28,10 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Matchers.any;
 
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -42,6 +43,8 @@ import org.junit.Test;
 
 import org.mockito.Matchers;
 
+import java.util.ArrayList;
+
 public class ExecutionVertexDeploymentTest {
 	
 	@Test
@@ -99,7 +102,6 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
@@ -144,7 +146,6 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			// no repeated scheduling
@@ -202,7 +203,6 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -238,7 +238,6 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			
 			// wait until the state transition must be done
@@ -327,7 +326,6 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			
-			LibraryCacheManager.register(vertex.getJobId(), new String[0]);
 			vertex.deployToSlot(slot);
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 411fc4e..498a6f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -164,5 +164,10 @@ public class LocalInstanceManagerTest {
 		public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
 			return new InstanceID();
 		}
+
+		@Override
+		public int getBlobServerPort() {
+			return 0;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
index 27a3708..f8b229f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -22,9 +22,10 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -37,6 +38,8 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
 import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 public class CoLocationConstraintITCase {
 	
 	/**
@@ -73,9 +76,6 @@ public class CoLocationConstraintITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index f4e442b..0952f60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -24,9 +24,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.api.RecordReader;
@@ -46,6 +47,8 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 /**
  * This test is intended to cover the basic functionality of the {@link JobManager}.
  */
@@ -70,9 +73,6 @@ public class JobManagerITCase {
 				
 				assertEquals(1, jm.getTotalNumberOfRegisteredSlots());
 				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				assertEquals(AbstractJobResult.ReturnCode.ERROR, result.getReturnCode());
 				
@@ -143,9 +143,6 @@ public class JobManagerITCase {
 				
 				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -216,10 +213,6 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -279,9 +272,6 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -341,9 +331,6 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -407,9 +394,6 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -473,9 +457,6 @@ public class JobManagerITCase {
 								.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -537,9 +518,6 @@ public class JobManagerITCase {
 			try {
 				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -601,9 +579,6 @@ public class JobManagerITCase {
 			try {
 				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -661,10 +636,6 @@ public class JobManagerITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 				
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -729,9 +700,6 @@ public class JobManagerITCase {
 			try {
 				assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
 				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -796,9 +764,6 @@ public class JobManagerITCase {
 			try {
 				assertEquals(2*NUM_TASKS, jm.getNumberOfSlotsAvailableToScheduler());
 				
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca4380c3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
index e25a9da..29293da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -22,9 +22,10 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.AbstractJobResult;
 import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.LocalInstanceManager;
 import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,6 +39,8 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
 import org.apache.flink.runtime.jobmanager.tasks.Sender;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 public class SlotSharingITCase {
 
 	
@@ -73,9 +76,6 @@ public class SlotSharingITCase {
 					.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
@@ -145,9 +145,6 @@ public class SlotSharingITCase {
 								.getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
 			
 			try {
-				// we need to register the job at the library cache manager (with no libraries)
-				LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
-				
 				JobSubmissionResult result = jm.submitJob(jobGraph);
 
 				if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {


Mime
View raw message