flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [07/11] git commit: [FLINK-1152] More robust resource release when tasks are canceled during deployment
Date Fri, 17 Oct 2014 14:52:34 GMT
[FLINK-1152] More robust resource release when tasks are canceled during deployment


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/446a2471
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/446a2471
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/446a2471

Branch: refs/heads/release-0.7
Commit: 446a2471e70abecc1e4b78675f42d59390b27bd8
Parents: da54db6
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Oct 13 20:07:59 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Oct 17 16:48:55 2014 +0200

----------------------------------------------------------------------
 .../flink/runtime/filecache/FileCache.java      |  26 ++--
 .../runtime/io/network/ChannelManager.java      |   3 +
 .../apache/flink/runtime/taskmanager/Task.java  |   5 +-
 .../flink/runtime/taskmanager/TaskManager.java  | 140 ++++++++++---------
 4 files changed, 94 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446a2471/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index c6832f7..de8d59c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.filecache;
 
 import java.io.File;
@@ -44,6 +43,8 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The FileCache is used to create the local files for the registered cache files when a
task is deployed. 
@@ -52,9 +53,12 @@ import org.apache.flink.runtime.util.IOUtils;
  */
 public class FileCache {
 
-	private LocalFileSystem lfs = new LocalFileSystem();
-
+	private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
+	
 	private static final Object lock = new Object();
+	
+	
+	private LocalFileSystem lfs = new LocalFileSystem();
 
 	private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID,String>,
Integer>();
 
@@ -99,10 +103,11 @@ public class FileCache {
 	}
 
 	public void shutdown() {
-		if (this.executorService != null) {
-			this.executorService.shutdown();
+		ScheduledExecutorService es = this.executorService;
+		if (es != null) {
+			es.shutdown();
 			try {
-				this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
+				es.awaitTermination(5000L, TimeUnit.MILLISECONDS);
 			} catch (InterruptedException e) {
 				throw new RuntimeException("Error shutting down the file cache", e);
 			}
@@ -133,6 +138,7 @@ public class FileCache {
 					IOUtils.copyBytes(fsInput, lfsOutput);
 					new File(targetPath.toString()).setExecutable(executable);
 				} catch (IOException ioe) {
+					LOG.error("could not copy file to local file cache.", ioe);
 				}
 			}
 		}
@@ -159,8 +165,8 @@ public class FileCache {
 				synchronized (lock) {
 					copy(new Path(filePath), tmp, this.executable);
 				}
-			} catch (IOException e1) {
-				throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
+			} catch (IOException e) {
+				LOG.error("Could not copy file to local file cache.", e);
 			}
 			return tmp;
 		}
@@ -192,8 +198,8 @@ public class FileCache {
 				if (lfs.exists(tmp)) {
 					lfs.delete(tmp, true);
 				}
-			} catch (IOException e1) {
-				throw new RuntimeException("Error deleting the file", e1);
+			} catch (IOException e) {
+				LOG.error("Could not delete file from local file cache.", e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446a2471/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 38d40ee..59084e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -197,6 +197,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 	 */
 	public void unregister(ExecutionAttemptID executionId, Task task) {
 		final Environment environment = task.getEnvironment();
+		if (environment == null) {
+			return;
+		}
 
 		// destroy and remove OUTPUT channels from registered channels and cache
 		for (ChannelID id : environment.getOutputChannelIDs()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446a2471/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d393e2e..12c55bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -331,8 +331,9 @@ public final class Task {
 	 *        the central memory manager
 	 */
 	public void unregisterMemoryManager(MemoryManager memoryManager) {
-		if (memoryManager != null) {
-			memoryManager.releaseAll(this.environment.getInvokable());
+		RuntimeEnvironment env = this.environment;
+		if (memoryManager != null && env != null) {
+			memoryManager.releaseAll(env.getInvokable());
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/446a2471/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 44327e1..1498a1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -570,87 +570,80 @@ public class TaskManager implements TaskOperationProtocol {
 		final int taskIndex = tdd.getIndexInSubtaskGroup();
 		final int numSubtasks = tdd.getCurrentNumberOfSubtasks();
 		
+		Task task = null;
 		boolean jarsRegistered = false;
+		boolean success = false;
 		
 		try {
 			// Now register data with the library manager
 			libraryCacheManager.register(jobID, tdd.getRequiredJarFiles());
-
-			// library and classloader issues first
 			jarsRegistered = true;
-
+			
+			// library and classloader issues first
 			final ClassLoader userCodeClassLoader = libraryCacheManager.getClassLoader(jobID);
 			if (userCodeClassLoader == null) {
 				throw new Exception("No user code ClassLoader available.");
 			}
 			
-			final Task task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(),
this);
+			task = new Task(jobID, vertexId, taskIndex, numSubtasks, executionId, tdd.getTaskName(),
this);
 			if (this.runningTasks.putIfAbsent(executionId, task) != null) {
 				throw new Exception("TaskManager contains already a task with executionId " + executionId);
 			}
 			
-			// another try/finally-success block to ensure that the tasks are removed properly in
case of an exception
-			boolean success = false;
-			try {
-				final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider,
jobID, vertexId);
-				final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader,
this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
-				task.setEnvironment(env);
-				
-				// register the task with the network stack and profilers
-				this.channelManager.register(task);
-				
-				final Configuration jobConfig = tdd.getJobConfiguration();
-	
-				boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY,
true);
-	
-				// Register environment, input, and output gates for profiling
-				if (enableProfiling) {
-					task.registerProfiler(this.profiler, jobConfig);
-				}
-				
-				// now that the task is successfully created and registered, we can start copying the
-				// distributed cache temp files
-				Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
-				for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration()))
{
-					FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
-					cpTasks.put(e.getKey(), cp);
-				}
-				env.addCopyTasksForCacheFile(cpTasks);
-				
-				if (!task.startExecution()) {
-					throw new Exception("Cannot start task. Task was canceled or failed.");
-				}
+			final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider,
jobID, vertexId);
+			final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader,
this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
+			task.setEnvironment(env);
 			
-				// final check that we can go (we do this after the registration, so the the "happen's
before"
-				// relationship ensures that either the shutdown removes this task, or we are aware of
the shutdown
-				if (shutdownStarted.get()) {
-					throw new Exception("Task Manager is shut down.");
-				}
-				
-				success = true;
-				return new TaskOperationResult(executionId, true);
+			// register the task with the network stack and profilers
+			this.channelManager.register(task);
+			
+			final Configuration jobConfig = tdd.getJobConfiguration();
+
+			boolean enableProfiling = this.profiler != null && jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY,
true);
+
+			// Register environment, input, and output gates for profiling
+			if (enableProfiling) {
+				task.registerProfiler(this.profiler, jobConfig);
 			}
-			finally {
-				if (!success) {
-					// remove task 
-					this.runningTasks.remove(executionId);
-					
-					// delete distributed cache files
-					for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration()))
{
-						this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), jobID);
-					}
-				}
+			
+			// now that the task is successfully created and registered, we can start copying the
+			// distributed cache temp files
+			Map<String, FutureTask<Path>> cpTasks = new HashMap<String, FutureTask<Path>>();
+			for (Entry<String, DistributedCacheEntry> e : DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration()))
{
+				FutureTask<Path> cp = this.fileCache.createTmpFile(e.getKey(), e.getValue(), jobID);
+				cpTasks.put(e.getKey(), cp);
 			}
-		}
-		catch (Throwable t) {
-			LOG.error("Could not instantiate task", t);
+			env.addCopyTasksForCacheFile(cpTasks);
 			
-			if (jarsRegistered) {
-				libraryCacheManager.unregister(jobID);
+			if (!task.startExecution()) {
+				return new TaskOperationResult(executionId, false, "Task was canceled or failed.");
+			}
+		
+			// final check that we can go (we do this after the registration, so the the "happen's
before"
+			// relationship ensures that either the shutdown removes this task, or we are aware of
the shutdown
+			if (shutdownStarted.get()) {
+				throw new Exception("Task Manager is shut down.");
 			}
 			
+			success = true;
+			return new TaskOperationResult(executionId, true);
+		}
+		catch (Throwable t) {
+			LOG.error("Could not instantiate task", t);
 			return new TaskOperationResult(executionId, false, ExceptionUtils.stringifyException(t));
 		}
+		finally {
+			if (!success) {
+				this.runningTasks.remove(executionId);
+				
+				if (task != null) {
+					removeAllTaskResources(task);
+				}
+				if (jarsRegistered) {
+					libraryCacheManager.unregister(jobID);
+				}
+			}
+		}
 	}
 
 	/**
@@ -660,7 +653,6 @@ public class TaskManager implements TaskOperationProtocol {
 	 *        the ID of the task to be unregistered
 	 */
 	private void unregisterTask(ExecutionAttemptID executionId) {
-
 		// Task de-registration must be atomic
 		final Task task = this.runningTasks.remove(executionId);
 		if (task == null) {
@@ -670,22 +662,34 @@ public class TaskManager implements TaskOperationProtocol {
 			return;
 		}
 
-		// remove the local tmp file for unregistered tasks.
-		for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration()))
{
-			this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
-		}
-		
+		removeAllTaskResources(task);
+
+		// Unregister task from library cache manager
+		libraryCacheManager.unregister(task.getJobID());
+	}
+	
+	private void removeAllTaskResources(Task task) {
 		// Unregister task from the byte buffered channel manager
-		this.channelManager.unregister(executionId, task);
+		this.channelManager.unregister(task.getExecutionId(), task);
 
 		// Unregister task from profiling
 		task.unregisterProfiler(this.profiler);
 
 		// Unregister task from memory manager
 		task.unregisterMemoryManager(this.memoryManager);
-
-		// Unregister task from library cache manager
-		libraryCacheManager.unregister(task.getJobID());
+		
+		// remove the local tmp file for unregistered tasks.
+		try {
+			RuntimeEnvironment re = task.getEnvironment();
+			if (re != null) {
+				for (Entry<String, DistributedCacheEntry> e: DistributedCache.readFileInfoFromConfig(task.getEnvironment().getJobConfiguration()))
{
+					this.fileCache.deleteTmpFile(e.getKey(), e.getValue(), task.getJobID());
+				}
+			}
+		}
+		catch (Throwable t) {
+			LOG.error("Error cleaning up local files from the distributed cache.", t);
+		}
 	}
 
 	public void notifyExecutionStateChange(JobID jobID, ExecutionAttemptID executionId, ExecutionState
newExecutionState, Throwable optionalError) {


Mime
View raw message