flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-1492] [FLINK-1513] Fix BLOB service shutdown message and avoid global configuration
Date Wed, 11 Feb 2015 19:43:23 GMT
Repository: flink
Updated Branches:
  refs/heads/master 90834ca70 -> 6151d707c


[FLINK-1492] [FLINK-1513] Fix BLOB service shutdown message and avoid global configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6151d707
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6151d707
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6151d707

Branch: refs/heads/master
Commit: 6151d707c6130a38d18501414447cfa88969d9b8
Parents: 69b7945
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 11 19:08:21 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 20:42:43 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  2 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 53 +++++++++------
 .../flink/runtime/blob/BlobInputStream.java     |  8 ---
 .../apache/flink/runtime/blob/BlobServer.java   | 70 ++++++++++++--------
 .../apache/flink/runtime/blob/BlobService.java  |  2 +-
 .../apache/flink/runtime/blob/BlobUtils.java    | 40 +++++------
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  5 +-
 .../taskmanager/TaskManagerConfiguration.scala  |  6 +-
 .../flink/runtime/blob/BlobCacheTest.java       | 17 ++---
 .../flink/runtime/blob/BlobClientTest.java      | 11 +--
 .../flink/runtime/blob/BlobUtilsTest.java       | 21 +-----
 .../BlobLibraryCacheManagerTest.java            |  8 +--
 13 files changed, 120 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index fdd3c95..a1515d0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -366,7 +366,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable,
 		if (o == null) {
 			return defaultValue;
 		}
-		else if (o.getClass() == byte[].class) {
+		else if (o.getClass().equals(byte[].class)) {
 			return (byte[]) o;
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index d0d9a45..40ec4e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,8 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting
BLOBs through the
  * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve the file from
its local cache. Only if the
  * local cache does not contain the desired BLOB, the BLOB cache will try to download it
from the BLOB server.
- * <p>
- * This class is thread-safe.
  */
 public final class BlobCache implements BlobService {
 
@@ -49,16 +49,21 @@ public final class BlobCache implements BlobService {
 
 	private final File storageDir;
 
-	private AtomicBoolean shutdownRequested = new AtomicBoolean();
+	private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
 	/** Shutdown hook thread to ensure deletion of the storage directory. */
 	private final Thread shutdownHook;
 
-	public BlobCache(InetSocketAddress serverAddress) {
-		this.serverAddress = serverAddress;
 
-		this.storageDir = BlobUtils.initStorageDirectory();
+	public BlobCache(InetSocketAddress serverAddress, Configuration configuration) {
+		if (serverAddress == null || configuration == null) {
+			throw new NullPointerException();
+		}
+
+		this.serverAddress = serverAddress;
 
+		String storageDirectory = configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,
null);
+		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB cache storage directory " + storageDir);
 
 		// Add shutdown hook to delete storage directory
@@ -77,7 +82,6 @@ public final class BlobCache implements BlobService {
 	 *         thrown if an I/O error occurs while downloading the BLOBs from the BLOB server
 	 */
 	public URL getURL(final BlobKey requiredBlob) throws IOException {
-
 		if (requiredBlob == null) {
 			throw new IllegalArgumentException("Required BLOB cannot be null.");
 		}
@@ -95,16 +99,8 @@ public final class BlobCache implements BlobService {
 					LOG.debug("Trying to download " + requiredBlob + " from " + serverAddress);
 				}
 
-				if (bc == null) {
-
-					if (serverAddress == null) {
-						throw new IllegalArgumentException(
-							"Argument serverAddress is null: Cannot download libraries from BLOB server");
-					}
-
-					bc = new BlobClient(serverAddress);
-					buf = new byte[BlobServer.BUFFER_SIZE];
-				}
+				bc = new BlobClient(serverAddress);
+				buf = new byte[BlobServer.BUFFER_SIZE];
 
 				InputStream is = null;
 				OutputStream os = null;
@@ -160,11 +156,28 @@ public final class BlobCache implements BlobService {
 	}
 
 	@Override
-	public void shutdown() throws IOException {
+	public void shutdown() {
 		if (shutdownRequested.compareAndSet(false, true)) {
-			FileUtils.deleteDirectory(storageDir);
+			// Clean up the storage directory
+			try {
+				FileUtils.deleteDirectory(storageDir);
+			}
+			catch (IOException e) {
+				LOG.error("BLOB cache failed to properly clean up its storage directory.");
+			}
 
-			Runtime.getRuntime().removeShutdownHook(shutdownHook);
+			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown
hook itself
+			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+				try {
+					Runtime.getRuntime().removeShutdownHook(shutdownHook);
+				}
+				catch (IllegalStateException e) {
+					// race, JVM is in shutdown already, we can safely ignore this
+				}
+				catch (Throwable t) {
+					LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
+				}
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index f93fd50..3654f8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -69,7 +69,6 @@ final class BlobInputStream extends InputStream {
 	 *         throws if an I/O error occurs while reading the BLOB data from the BLOB server
 	 */
 	BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey, final byte[]
buf) throws IOException {
-
 		this.wrappedInputStream = wrappedInputStream;
 		this.blobKey = blobKey;
 		this.bytesToReceive = BlobServer.readLength(buf, wrappedInputStream);
@@ -93,7 +92,6 @@ final class BlobInputStream extends InputStream {
 
 	@Override
 	public int read() throws IOException {
-
 		if (this.bytesReceived == this.bytesToReceive) {
 			return -1;
 		}
@@ -125,7 +123,6 @@ final class BlobInputStream extends InputStream {
 
 	@Override
 	public int read(byte[] b, int off, int len) throws IOException {
-
 		final int bytesMissing = this.bytesToReceive - this.bytesReceived;
 
 		if (bytesMissing == 0) {
@@ -155,13 +152,11 @@ final class BlobInputStream extends InputStream {
 
 	@Override
 	public long skip(long n) throws IOException {
-
 		return 0L;
 	}
 
 	@Override
 	public int available() throws IOException {
-
 		return 0;
 	}
 
@@ -171,19 +166,16 @@ final class BlobInputStream extends InputStream {
 	}
 
 	public void mark(final int readlimit) {
-
 		// Do not do anything here
 	}
 
 	@Override
 	public void reset() throws IOException {
-
 		throw new IOException("mark/reset not supported");
 	}
 
 	@Override
 	public boolean markSupported() {
-
 		return false;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 068a859..220d3a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobID;
 
 import org.slf4j.Logger;
@@ -39,10 +41,8 @@ import org.slf4j.LoggerFactory;
  * This class implements the BLOB server. The BLOB server is responsible for listening for
incoming requests and
  * spawning threads to handle these requests. Furthermore, it takes care of creating the
directory structure to store
  * the BLOBs or temporarily cache them.
- * <p>
- * This class is thread-safe.
  */
-public final class BlobServer extends Thread implements BlobService{
+public final class BlobServer extends Thread implements BlobService {
 
 	/**
 	 * The log object used for debugging.
@@ -103,7 +103,15 @@ public final class BlobServer extends Thread implements BlobService{
 	 * @throws IOException
 	 *         thrown if the BLOB server cannot bind to a free network port
 	 */
-	public BlobServer() throws IOException {
+	public BlobServer(Configuration config) throws IOException {
+
+		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,
null);
+		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
+		LOG.info("Created BLOB server storage directory {}", storageDir);
+
+		// Add shutdown hook to delete storage directory
+		this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+
 		try {
 			this.serverSocket = new ServerSocket(0);
 
@@ -113,12 +121,6 @@ public final class BlobServer extends Thread implements BlobService{
 				LOG.info(String.format("Started BLOB server on port %d",
 						this.serverSocket.getLocalPort()));
 			}
-
-			this.storageDir = BlobUtils.initStorageDirectory();
-
-			LOG.info("Created BLOB server storage directory " + storageDir);
-
-			shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 		}
 		catch (IOException e) {
 			throw new IOException("Could not create BlobServer with random port.", e);
@@ -132,7 +134,6 @@ public final class BlobServer extends Thread implements BlobService{
 	 * @return the network port the BLOB server is bound to
 	 */
 	public int getServerPort() {
-
 		return this.serverSocket.getLocalPort();
 	}
 
@@ -143,7 +144,7 @@ public final class BlobServer extends Thread implements BlobService{
 	 * @param key identifying the file
 	 * @return file handle to the file
 	 */
-	public File getStorageLocation(BlobKey key){
+	public File getStorageLocation(BlobKey key) {
 		return BlobUtils.getStorageLocation(storageDir, key);
 	}
 
@@ -154,7 +155,7 @@ public final class BlobServer extends Thread implements BlobService{
 	 * @param key to identify the file within the job context
 	 * @return file handle to the file
 	 */
-	public File getStorageLocation(JobID jobID, String key){
+	public File getStorageLocation(JobID jobID, String key) {
 		return BlobUtils.getStorageLocation(storageDir, jobID, key);
 	}
 
@@ -164,7 +165,7 @@ public final class BlobServer extends Thread implements BlobService{
 	 * @param jobID all files associated to this jobID will be deleted
 	 * @throws IOException
 	 */
-	public void deleteJobDirectory(JobID jobID) throws IOException{
+	public void deleteJobDirectory(JobID jobID) throws IOException {
 		BlobUtils.deleteJobDirectory(storageDir, jobID);
 	}
 
@@ -174,22 +175,21 @@ public final class BlobServer extends Thread implements BlobService{
 	 * @return a temporary file inside the BLOB server's incoming directory
 	 */
 	File getTemporaryFilename() {
-		return new File(BlobUtils.getIncomingDirectory(storageDir), String.format("temp-%08d",
-				tempFileCounter.getAndIncrement()));
+		return new File(BlobUtils.getIncomingDirectory(storageDir),
+				String.format("temp-%08d", tempFileCounter.getAndIncrement()));
 	}
 
 	@Override
 	public void run() {
-
 		try {
-
 			while (!this.shutdownRequested.get()) {
 				new BlobConnection(this.serverSocket.accept(), this).start();
 			}
-
-		} catch (IOException ioe) {
-			if (!this.shutdownRequested.get() && LOG.isErrorEnabled()) {
-				LOG.error("Blob server stopped working.", ioe);
+		}
+		catch (Throwable t) {
+			if (!this.shutdownRequested.get()) {
+				LOG.error("BLOB server stopped working. Shutting down", t);
+				shutdown();
 			}
 		}
 	}
@@ -198,7 +198,7 @@ public final class BlobServer extends Thread implements BlobService{
 	 * Shuts down the BLOB server.
 	 */
 	@Override
-	public void shutdown() throws IOException {
+	public void shutdown() {
 		if (shutdownRequested.compareAndSet(false, true)) {
 			try {
 				this.serverSocket.close();
@@ -214,12 +214,26 @@ public final class BlobServer extends Thread implements BlobService{
 			}
 
 			// Clean up the storage directory
-			FileUtils.deleteDirectory(storageDir);
-
-			// Remove shutdown hook to prevent resource leaks
-			Runtime.getRuntime().removeShutdownHook(shutdownHook);
+			try {
+				FileUtils.deleteDirectory(storageDir);
+			}
+			catch (IOException e) {
+				LOG.error("BLOB server failed to properly clean up its storage directory.");
+			}
 
-			// TODO: Find/implement strategy to handle content-addressable BLOBs
+			// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
+			// shutdown hook itself
+			if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+				try {
+					Runtime.getRuntime().removeShutdownHook(shutdownHook);
+				}
+				catch (IllegalStateException e) {
+					// race, JVM is in shutdown already, we can safely ignore this
+				}
+				catch (Throwable t) {
+					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
+				}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index b6ed249..148476f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -49,5 +49,5 @@ public interface BlobService {
 	 * Shutdown method which is called to terminate the blob service.
 	 * @throws IOException
 	 */
-	void shutdown() throws IOException;
+	void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 476f481..53cab1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.slf4j.Logger;
 
@@ -61,15 +59,10 @@ public class BlobUtils {
 	 *
 	 * @return the storage directory used by a BLOB service
 	 */
-	static File initStorageDirectory() {
-		File baseDir;
-		String sd = GlobalConfiguration.getString(
-				ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
-		if (sd != null) {
-			baseDir = new File(sd);
-		} else {
-			baseDir = new File(System.getProperty("java.io.tmpdir"));
-		}
+	static File initStorageDirectory(String storageDirectory) {
+		File baseDir = storageDirectory != null ?
+				new File(storageDirectory) :
+				new File(System.getProperty("java.io.tmpdir"));
 
 		File storageDir;
 		final int MAX_ATTEMPTS = 10;
@@ -87,7 +80,7 @@ public class BlobUtils {
 		}
 
 		// max attempts exceeded to find a storage directory
-		throw new RuntimeException("Could not create storage directory in '" + baseDir + "'.");
+		throw new RuntimeException("Could not create storage directory for BLOB store in '" + baseDir
+ "'.");
 	}
 
 	/**
@@ -139,9 +132,7 @@ public class BlobUtils {
 	 *        the key of the BLOB
 	 * @return the (designated) physical storage location of the BLOB with the given job ID
and key
 	 */
-	static File getStorageLocation(final File storageDir, final JobID jobID,
-									final String key) {
-
+	static File getStorageLocation(final File storageDir, final JobID jobID, final String key)
{
 		return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key));
 	}
 
@@ -192,7 +183,7 @@ public class BlobUtils {
 		try {
 			return MessageDigest.getInstance(HASHING_ALGORITHM);
 		} catch (NoSuchAlgorithmException e) {
-			throw new RuntimeException(e);
+			throw new RuntimeException("Cannot instantiate the message digest algorithm " + HASHING_ALGORITHM,
e);
 		}
 	}
 
@@ -215,9 +206,18 @@ public class BlobUtils {
 			}
 		});
 
-		// Add JVM shutdown hook to call shutdown of service
-		Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-		return shutdownHook;
+		try {
+			// Add JVM shutdown hook to call shutdown of service
+			Runtime.getRuntime().addShutdownHook(shutdownHook);
+			return shutdownHook;
+		}
+		catch (IllegalStateException e) {
+			// JVM is already shutting down. no need to do our work
+			return null;
+		}
+		catch (Throwable t) {
+			logger.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
+			return null;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b20fd01..4636999 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -113,7 +113,8 @@ Actor with ActorLogMessages with ActorLogging {
   val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
   val instanceManager = new InstanceManager()
   val scheduler = new FlinkScheduler()
-  val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval)
+  val libraryCacheManager = new BlobLibraryCacheManager(
+                                        new BlobServer(configuration), cleanupInterval)
 
   // List of current jobs running
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ceb070f..1bfe172 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -582,7 +582,8 @@ import scala.collection.JavaConverters._
 
       log.info("Determined BLOB server address to be {}.", address)
 
-      libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval)
+      libraryCacheManager = new BlobLibraryCacheManager(
+                                     new BlobCache(address, configuration), cleanupInterval)
     } else {
       libraryCacheManager = new FallbackLibraryCacheManager
     }
@@ -871,7 +872,7 @@ object TaskManager {
 
     val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize,
       tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout,
-      maxRegistrationDuration)
+      maxRegistrationDuration, configuration)
 
     (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 82cbe9e..8c1217e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import org.apache.flink.configuration.Configuration
+
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
 case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int,
                                     tmpDirPaths: Array[String], cleanupInterval: Long,
                                     memoryLogggingIntervalMs: Option[Long],
                                     profilingInterval: Option[Long],
-                                    timeout: FiniteDuration, maxRegistrationDuration: Duration)
+                                    timeout: FiniteDuration,
+                                    maxRegistrationDuration: Duration,
+                                    configuration: Configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
index d456135..32c8c3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
@@ -31,6 +31,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
 /**
@@ -50,7 +51,7 @@ public class BlobCacheTest {
 		try {
 
 			// Start the BLOB server
-			blobServer = new BlobServer();
+			blobServer = new BlobServer(new Configuration());
 			final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getServerPort());
 
 			// Upload BLOBs
@@ -68,7 +69,7 @@ public class BlobCacheTest {
 				}
 			}
 
-			blobCache = new BlobCache(serverAddress);
+			blobCache = new BlobCache(serverAddress, new Configuration());
 
 			for(int i = 0; i < blobKeys.size(); i++){
 				blobCache.getURL(blobKeys.get(i));
@@ -109,19 +110,11 @@ public class BlobCacheTest {
 			fail(ioe.getMessage());
 		} finally {
 			if (blobServer != null) {
-				try {
-					blobServer.shutdown();
-				} catch (IOException e) {
-					e.printStackTrace();
-				}
+				blobServer.shutdown();
 			}
 
 			if(blobCache != null){
-				try {
-					blobCache.shutdown();
-				} catch (IOException e) {
-					e.printStackTrace();
-				}
+				blobCache.shutdown();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 2542bbb..1465777 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -31,6 +31,7 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.MessageDigest;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.util.StringUtils;
 import org.junit.AfterClass;
@@ -57,9 +58,8 @@ public class BlobClientTest {
 	 */
 	@BeforeClass
 	public static void startServer() {
-
 		try {
-			BLOB_SERVER = new BlobServer();
+			BLOB_SERVER = new BlobServer(new Configuration());
 		} catch (IOException ioe) {
 			fail(StringUtils.stringifyException(ioe));
 		}
@@ -71,13 +71,8 @@ public class BlobClientTest {
 	 */
 	@AfterClass
 	public static void stopServer() {
-
 		if (BLOB_SERVER != null) {
-			try {
-				BLOB_SERVER.shutdown();
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
+			BLOB_SERVER.shutdown();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 15a63b3..a5c83b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -18,33 +18,18 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 
-import static org.mockito.Mockito.mock;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(BlobKey.class)
 public class BlobUtilsTest {
 
 	@Test(expected = Exception.class)
 	public void testExceptionOnCreateStorageDirectoryFailure() {
-
-		// Configure a non existing directory
-		Configuration config = new Configuration();
-		config.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, "/cannot-create-this");
-
-		GlobalConfiguration.includeConfiguration(config);
-
 		// Should throw an Exception
-		BlobUtils.initStorageDirectory();
+		BlobUtils.initStorageDirectory("/cannot-create-this");
 	}
 
 	@Test(expected = Exception.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 71c0669..2675346 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -51,7 +51,7 @@ public class BlobLibraryCacheManagerTest {
 		final byte[] buf = new byte[128];
 
 		try {
-			server = new BlobServer();
+			server = new BlobServer(new Configuration());
 			InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getServerPort());
 			BlobClient bc = new BlobClient(blobSocketAddress);
 
@@ -109,11 +109,7 @@ public class BlobLibraryCacheManagerTest {
 		}
 		finally{
 			if (server != null){
-				try {
-					server.shutdown();
-				} catch (IOException e) {
-					e.printStackTrace();
-				}
+				server.shutdown();
 			}
 
 			if (libraryCacheManager != null){


Mime
View raw message