flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-7669] Always load Flink classes via parent ClassLoader
Date Wed, 25 Oct 2017 13:57:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master 41e5a8338 -> 3bbff97b7


[FLINK-7669] Always load Flink classes via parent ClassLoader

Before, when setting classloader.resolve-order Flink classes where also
resolved via the child ClassLoader which could lead to problems if the
user-jar contains copies of Flink classes. This change introduces a new
setting classloader.parent-first-patterns. Classes that match this
pattern will always be resolved by the parent first, even when using the
child-first ClassLoader.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d88ab85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d88ab85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d88ab85

Branch: refs/heads/master
Commit: 2d88ab85ba8ff1f2e920e8a142c9b702c7f1e6da
Parents: 41e5a83
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Oct 23 17:03:21 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Oct 25 15:55:46 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |  7 +++++
 .../state/RocksDbMultiClassLoaderTest.java      |  4 +--
 .../apache/flink/configuration/CoreOptions.java |  5 +++
 .../librarycache/BlobLibraryCacheManager.java   | 25 ++++++++++++---
 .../librarycache/FlinkUserCodeClassLoaders.java | 31 ++++++++++++------
 .../runtime/jobmaster/JobManagerServices.java   |  9 +++++-
 .../runtime/taskexecutor/TaskExecutor.java      |  4 ++-
 .../taskexecutor/TaskManagerConfiguration.java  | 16 ++++++++--
 .../flink/runtime/jobmanager/JobManager.scala   |  8 ++++-
 .../flink/runtime/taskmanager/TaskManager.scala |  4 ++-
 .../BlobLibraryCacheManagerTest.java            |  8 ++---
 .../BlobLibraryCacheRecoveryITCase.java         |  2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  4 +--
 .../JobManagerLeaderElectionTest.java           |  2 +-
 ...askManagerComponentsStartupShutdownTest.java |  3 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java   |  3 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |  3 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  3 +-
 .../tasks/StreamTaskTerminationTest.java        |  3 +-
 .../test_streaming_classloader.sh               | 33 ++++++++++++++++++++
 20 files changed, 142 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index d476492..7720e3a 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -77,6 +77,13 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is
going to
 - `classloader.resolve-order`: Whether Flink should use a child-first `ClassLoader` when
loading
 user-code classes or a parent-first `ClassLoader`. Can be one of `parent-first` or `child-first`.
(default: `child-first`)
 
+- `classloader.parent-first-patterns`: A (semicolon-separated) list of patterns that specifies
which
+classes should always be resolved through the parent `ClassLoader` first. A pattern is a
simple
+prefix that is checked against the fully qualified class name. By default, this is set to
+`java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback`.
+If you want to change this setting you have to make sure to also include the default patterns
in
+your list of patterns if you want to keep that default behaviour.
+
 ## Advanced Options
 
 ### Compute

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
index 1dbc05e..72c85ec 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -47,8 +47,8 @@ public class RocksDbMultiClassLoaderTest {
 		final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation();
 
 		final ClassLoader parent = getClass().getClassLoader();
-		final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1,
codePath2 }, parent);
-		final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1,
codePath2 }, parent);
+		final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1,
codePath2 }, parent, new String[0]);
+		final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1,
codePath2 }, parent, new String[0]);
 
 		final String className = RocksDBStateBackend.class.getName();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index d1005c4..e8ab8e4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -31,6 +31,11 @@ public class CoreOptions {
 		.key("classloader.resolve-order")
 		.defaultValue("child-first");
 
+	public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions
+		.key("classloader.parent-first-patterns")
+		.defaultValue("java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback");
+
+
 	// ------------------------------------------------------------------------
 	//  process parameters
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 86ebfcc..b90e995 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -67,12 +67,21 @@ public class BlobLibraryCacheManager implements LibraryCacheManager {
 	/** The resolve order to use when creating a {@link ClassLoader}. */
 	private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
 
+	/**
+	 * List of patterns for classes that should always be resolved from the parent ClassLoader,
+	 * if possible.
+	 */
+	private final String[] alwaysParentFirstPatterns;
+
 	// --------------------------------------------------------------------------------------------
 
-	public BlobLibraryCacheManager(PermanentBlobService blobService,
-			FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) {
+	public BlobLibraryCacheManager(
+			PermanentBlobService blobService,
+			FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
+			String[] alwaysParentFirstPatterns) {
 		this.blobService = checkNotNull(blobService);
 		this.classLoaderResolveOrder = checkNotNull(classLoaderResolveOrder);
+		this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
 	}
 
 	@Override
@@ -118,7 +127,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager {
 					}
 
 					cacheEntries.put(jobId, new LibraryCacheEntry(
-						requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder));
+						requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));
 				} catch (Throwable t) {
 					// rethrow or wrap
 					ExceptionUtils.tryRethrowIOException(t);
@@ -243,19 +252,25 @@ public class BlobLibraryCacheManager implements LibraryCacheManager
{
 		 * 		<tt>requiredLibraries</tt> and <tt>requiredClasspaths</tt>)
 		 * @param initialReference
 		 * 		reference holder ID
+		 * @param classLoaderResolveOrder Whether to resolve classes first in the child ClassLoader
+		 * 		or parent ClassLoader
+		 * @param alwaysParentFirstPatterns A list of patterns for classes that should always be
+		 * 		resolved from the parent ClassLoader (if possible).
 		 */
 		LibraryCacheEntry(
 				Collection<PermanentBlobKey> requiredLibraries,
 				Collection<URL> requiredClasspaths,
 				URL[] libraryURLs,
 				ExecutionAttemptID initialReference,
-				FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) {
+				FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
+				String[] alwaysParentFirstPatterns) {
 
 			this.classLoader =
 				FlinkUserCodeClassLoaders.create(
 					classLoaderResolveOrder,
 					libraryURLs,
-					FlinkUserCodeClassLoaders.class.getClassLoader());
+					FlinkUserCodeClassLoaders.class.getClassLoader(),
+					alwaysParentFirstPatterns);
 
 			// NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons
 			//       see http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
index ef36c36..d40802e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
@@ -31,24 +31,23 @@ import java.util.List;
  */
 public class FlinkUserCodeClassLoaders {
 
-	public static URLClassLoader parentFirst(URL[] urls) {
-		return new ParentFirstClassLoader(urls);
-	}
-
 	public static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) {
 		return new ParentFirstClassLoader(urls, parent);
 	}
 
-	public static URLClassLoader childFirst(URL[] urls, ClassLoader parent) {
-		return new ChildFirstClassLoader(urls, parent);
+	public static URLClassLoader childFirst(
+		URL[] urls,
+		ClassLoader parent,
+		String[] alwaysParentFirstPatterns) {
+		return new ChildFirstClassLoader(urls, parent, alwaysParentFirstPatterns);
 	}
 
 	public static URLClassLoader create(
-		ResolveOrder resolveOrder, URL[] urls, ClassLoader parent) {
+		ResolveOrder resolveOrder, URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns)
{
 
 		switch (resolveOrder) {
 			case CHILD_FIRST:
-				return childFirst(urls, parent);
+				return childFirst(urls, parent, alwaysParentFirstPatterns);
 			case PARENT_FIRST:
 				return parentFirst(urls, parent);
 			default:
@@ -95,14 +94,28 @@ public class FlinkUserCodeClassLoaders {
 	 */
 	static final class ChildFirstClassLoader extends URLClassLoader {
 
-		public ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
+		/**
+		 * The classes that should always go through the parent ClassLoader. This is relevant
+		 * for Flink classes, for example, to avoid loading Flink classes that cross the
+		 * user-code/system-code barrier in the user-code ClassLoader.
+		 */
+		private final String[] alwaysParentFirstPatterns;
+
+		public ChildFirstClassLoader(URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns)
{
 			super(urls, parent);
+			this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
 		}
 
 		@Override
 		protected synchronized Class<?> loadClass(
 			String name, boolean resolve) throws ClassNotFoundException {
 
+			for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
+				if (name.startsWith(alwaysParentFirstPattern)) {
+					return super.loadClass(name, resolve);
+				}
+			}
+
 			// First, check if the class has already been loaded
 			Class<?> c = findLoadedClass(name);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index f7daabb..6abb1af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -116,8 +116,15 @@ public class JobManagerServices {
 		final String classLoaderResolveOrder =
 			config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
 
+		final String alwaysParentFirstLoaderString =
+			config.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER);
+		final String[] alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(";");
+
 		final BlobLibraryCacheManager libraryCacheManager =
-			new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder));
+			new BlobLibraryCacheManager(
+				blobServer,
+				FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
+				alwaysParentFirstLoaderPatterns);
 
 		final FiniteDuration timeout;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d6295b9..cd67705 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -957,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 				taskManagerConfiguration.getConfiguration(),
 				haServices.createBlobStore());
 			libraryCacheManager = new BlobLibraryCacheManager(
-				blobService.getPermanentBlobService(), taskManagerConfiguration.getClassLoaderResolveOrder());
+				blobService.getPermanentBlobService(),
+				taskManagerConfiguration.getClassLoaderResolveOrder(),
+				taskManagerConfiguration.getAlwaysParentFirstLoaderPatterns());
 		} catch (IOException e) {
 			// Can't pass the IOException up - we need a RuntimeException anyway
 			// two levels up where this is run asynchronously. Also, we don't

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 60dd643..f55e6f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -62,6 +62,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo
{
 
 	private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder;
 
+	private final String[] alwaysParentFirstLoaderPatterns;
+
 	public TaskManagerConfiguration(
 		int numberSlots,
 		String[] tmpDirectories,
@@ -73,7 +75,8 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo
{
 		long cleanupInterval,
 		Configuration configuration,
 		boolean exitJvmOnOutOfMemory,
-		FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder) {
+		FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder,
+		String[] alwaysParentFirstLoaderPatterns) {
 
 		this.numberSlots = numberSlots;
 		this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories);
@@ -85,6 +88,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo
{
 		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
 		this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
 		this.classLoaderResolveOrder = classLoaderResolveOrder;
+		this.alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderPatterns;
 	}
 
 	public int getNumberSlots() {
@@ -130,6 +134,10 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo
{
 		return classLoaderResolveOrder;
 	}
 
+	public String[] getAlwaysParentFirstLoaderPatterns() {
+		return alwaysParentFirstLoaderPatterns;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods
 	// --------------------------------------------------------------------------------------------
@@ -225,6 +233,9 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo
{
 		final String classLoaderResolveOrder =
 			configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
 
+		final String alwaysParentFirstLoaderString =
+			configuration.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER);
+		final String[] alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(";");
 
 		return new TaskManagerConfiguration(
 			numberSlots,
@@ -237,6 +248,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo
{
 			cleanupInterval,
 			configuration,
 			exitOnOom,
-			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder));
+			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
+			alwaysParentFirstLoaderPatterns);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bbb8275..3bb7faa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2483,6 +2483,9 @@ object JobManager {
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
     val classLoaderResolveOrder = configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER)
+    val alwaysParentFirstLoaderString =
+      configuration.getString(CoreOptions.ALWAYS_PARENT_FIRST_LOADER)
+    val alwaysParentFirstLoaderPatterns = alwaysParentFirstLoaderString.split(';')
 
     val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
@@ -2516,7 +2519,10 @@ object JobManager {
       instanceManager = new InstanceManager()
       scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager =
-        new BlobLibraryCacheManager(blobServer, ResolveOrder.fromString(classLoaderResolveOrder))
+        new BlobLibraryCacheManager(
+          blobServer,
+          ResolveOrder.fromString(classLoaderResolveOrder),
+          alwaysParentFirstLoaderPatterns)
 
       instanceManager.addInstanceListener(scheduler)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 75708d1..1219fc9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -968,7 +968,9 @@ class TaskManager(
       blobCache = Option(blobcache)
       libraryCacheManager = Some(
         new BlobLibraryCacheManager(
-            blobcache.getPermanentBlobService, config.getClassLoaderResolveOrder()))
+          blobcache.getPermanentBlobService,
+          config.getClassLoaderResolveOrder(),
+          config.getAlwaysParentFirstLoaderPatterns))
     }
     catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 080f743..83b6814 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -92,7 +92,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			keys1.add(server.putPermanent(jobId1, buf));
 			keys2.add(server.putPermanent(jobId2, buf));
 
-			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]);
 			cache.registerJob(jobId1);
 			cache.registerJob(jobId2);
 
@@ -222,7 +222,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			buf[0] += 1;
 			keys.add(server.putPermanent(jobId, buf));
 
-			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]);
 			cache.registerJob(jobId);
 
 			assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -334,7 +334,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			buf[0] += 1;
 			keys.add(server.putPermanent(jobId, buf));
 
-			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]);
 			cache.registerJob(jobId);
 
 			assertEquals(0, libCache.getNumberOfManagedJobs());
@@ -440,7 +440,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger {
 			PermanentBlobKey dataKey1 = server.putPermanent(jobId, new byte[]{1, 2, 3, 4, 5, 6, 7,
8});
 			PermanentBlobKey dataKey2 = server.putPermanent(jobId, new byte[]{11, 12, 13, 14, 15,
16, 17, 18});
 
-			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+			libCache = new BlobLibraryCacheManager(cache, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]);
 			assertEquals(0, libCache.getNumberOfManagedJobs());
 			checkFileCountForJob(2, jobId, server);
 			checkFileCountForJob(0, jobId, cache);

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 2eac17a..ad18ae3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -83,7 +83,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 				server[i] = new BlobServer(config, blobStoreService);
 				server[i].start();
 				serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
-				libServer[i] = new BlobLibraryCacheManager(server[i], FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+				libServer[i] = new BlobLibraryCacheManager(server[i], FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]);
 			}
 
 			// Random data

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 0907800..d4bed23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -198,7 +198,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				instanceManager,
 				scheduler,
 				blobServer,
-				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]),
 				archive,
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,
@@ -372,7 +372,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				mock(InstanceManager.class),
 				mock(Scheduler.class),
 				blobServer,
-				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+				new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]),
 				ActorRef.noSender(),
 				new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
 				timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index b97bf1a..c3b57fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -191,7 +191,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			new InstanceManager(),
 			new Scheduler(TestingUtils.defaultExecutionContext()),
 			blobServer,
-			new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+			new BlobLibraryCacheManager(blobServer, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]),
 			ActorRef.noSender(),
 			new NoRestartStrategy.NoRestartStrategyFactory(),
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 08ebbb3..10b74c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -123,7 +123,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger
{
 				1000000, // cleanup interval
 				config,
 				false, // exit-jvm-on-fatal-error
-				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST);
+				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+				new String[0]);
 
 			final int networkBufNum = 32;
 			// note: the network buffer memory configured here is not actually used below but set

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 82b6b29..8072295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -186,7 +186,8 @@ public class JvmExitOnFatalErrorTest {
 						blobService,
 						new BlobLibraryCacheManager(
 							blobService.getPermanentBlobService(),
-							FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+							FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+							new String[0]),
 						new FileCache(tmInfo.getTmpDirectories()),
 						tmInfo,
 						new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5a38615..a87e440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -164,7 +164,8 @@ public class BlockingCheckpointsTest {
 				blobService,
 				new BlobLibraryCacheManager(
 					blobService.getPermanentBlobService(),
-					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+					FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+					new String[0]),
 				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 				new TestingTaskManagerRuntimeInfo(),
 				new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index d049e31..c641aa8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -265,7 +265,8 @@ public class InterruptSensitiveRestoreTest {
 			blobService,
 			new BlobLibraryCacheManager(
 				blobService.getPermanentBlobService(),
-				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+				new String[0]),
 			new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 			new TestingTaskManagerRuntimeInfo(),
 			new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 7eb2655..b07b735 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -161,7 +161,8 @@ public class StreamTaskTerminationTest extends TestLogger {
 			blobService,
 			new BlobLibraryCacheManager(
 				blobService.getPermanentBlobService(),
-				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST),
+				FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
+				new String[0]),
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,
 			new UnregisteredTaskMetricsGroup(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2d88ab85/test-infra/end-to-end-test/test_streaming_classloader.sh
----------------------------------------------------------------------
diff --git a/test-infra/end-to-end-test/test_streaming_classloader.sh b/test-infra/end-to-end-test/test_streaming_classloader.sh
index efbf98e..8bc6858 100755
--- a/test-infra/end-to-end-test/test_streaming_classloader.sh
+++ b/test-infra/end-to-end-test/test_streaming_classloader.sh
@@ -45,6 +45,7 @@ GIT_REMOTE_URL=`grep "git\.remote\.origin\.url" $TEST_INFRA_DIR/../../flink-runt
 
 # remove any leftover classloader settings
 sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+sed -i -e 's/classloader.parent-first-patterns: .*//' $FLINK_DIR/conf/flink-conf.yaml
 echo "classloader.resolve-order: parent-first" >> "$FLINK_DIR/conf/flink-conf.yaml"
 
 start_cluster
@@ -68,11 +69,42 @@ if [[ "$OUTPUT" != "$EXPECTED" ]]; then
   PASS=""
 fi
 
+# This verifies that Flink classes are still resolved from the parent because the default
+# "parent-first-pattern" is "org.apache.flink"
+echo "Testing child-first class loading with Flink classes loaded via parent"
+
+# remove any leftover classloader settings
+sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
+sed -i -e 's/classloader.parent-first-patterns: .*//' $FLINK_DIR/conf/flink-conf.yaml
+echo "classloader.resolve-order: child-first" >> "$FLINK_DIR/conf/flink-conf.yaml"
+
+start_cluster
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --resolve-order parent-first --output $TEST_DATA_DIR/out/cl_out_cf_pf
+
+stop_cluster
+
+# remove classloader settings again
+sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml
+
+OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf_pf`
+# first field: whether we found the method on TaskManager
+# result of getResource(".version.properties"), should be from the child
+# ordered result of getResources(".version.properties"), should be child first
+EXPECTED="NoSuchMethodError:hello-there-42:hello-there-42${GIT_REMOTE_URL}"
+if [[ "$OUTPUT" != "$EXPECTED" ]]; then
+  echo "Output from Flink program does not match expected output."
+  echo -e "EXPECTED: $EXPECTED"
+  echo -e "ACTUAL: $OUTPUT"
+  PASS=""
+fi
+
 echo "Testing child-first class loading"
 
 # remove any leftover classloader settings
 sed -i -e 's/classloader.resolve-order: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
 echo "classloader.resolve-order: child-first" >> "$FLINK_DIR/conf/flink-conf.yaml"
+echo "classloader.parent-first-patterns: foo.bar" >> "$FLINK_DIR/conf/flink-conf.yaml"
 
 start_cluster
 
@@ -82,6 +114,7 @@ stop_cluster
 
 # remove classloader settings again
 sed -i -e 's/classloader.resolve-order: .*//' $FLINK_DIR/conf/flink-conf.yaml
+sed -i -e 's/classloader.parent-first-patterns: .*//' $FLINK_DIR/conf/flink-conf.yaml
 
 OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_cf`
 # first field: whether we found the method on TaskManager


Mime
View raw message