flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/2] flink git commit: [FLINK-6685] Prevent that SafetyNetCloseableRegistry is closed prematurely in Task::triggerCheckpointBarrier
Date Wed, 24 May 2017 12:51:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master ac6e5c9a0 -> 36830adac


[FLINK-6685] Prevent that SafetyNetCloseableRegistry is closed prematurely in Task::triggerCheckpointBarrier


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7639d49e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7639d49e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7639d49e

Branch: refs/heads/master
Commit: 7639d49e03206de68bdd96cb8dd29293583cd7fd
Parents: ac6e5c9
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Tue May 23 15:44:07 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Wed May 24 14:02:13 2017 +0200

----------------------------------------------------------------------
 .../flink/core/fs/FileSystemSafetyNet.java      | 20 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  9 ++++-----
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7639d49e/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index eb28504..1391a33 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -106,6 +106,26 @@ public class FileSystemSafetyNet {
 		}
 	}
 
+	/**
+	 * Returns the active safety-net registry for the current thread.
+	 * @deprecated This method should be removed after FLINK-6684 is implemented.
+	 */
+	@Deprecated
+	@Internal
+	public static SafetyNetCloseableRegistry getSafetyNetCloseableRegistryForThread() {
+		return REGISTRIES.get();
+	}
+
+	/**
+	 * Sets the active safety-net registry for the current thread.
+	 * @deprecated This method should be removed after FLINK-6684 is implemented.
+	 */
+	@Deprecated
+	@Internal
+	public static void setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry)
{
+		REGISTRIES.set(registry);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7639d49e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e626dae..e18628e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -71,7 +72,6 @@ import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.WrappingRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1172,13 +1172,14 @@ public class Task implements Runnable, TaskActions {
 				// build a local closure
 				final StatefulTask statefulTask = (StatefulTask) invokable;
 				final String taskName = taskNameWithSubtask;
-
+				final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
+					FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
 				Runnable runnable = new Runnable() {
 					@Override
 					public void run() {
 						// activate safety net for checkpointing thread
 						LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
-						FileSystemSafetyNet.initializeSafetyNetForThread();
+						FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
 						try {
 							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
@@ -1202,8 +1203,6 @@ public class Task implements Runnable, TaskActions {
 							// close and de-activate safety net for checkpointing thread
 							LOG.debug("Ensuring all FileSystem streams are closed for {}",
 									Thread.currentThread().getName());
-
-							FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 						}
 					}
 				};


Mime
View raw message