flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-7656] [runtime] Switch to user classloader before calling initializeOnMaster and finalizeOnMaster.
Date Fri, 22 Sep 2017 21:56:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master d0debf4a8 -> 4afca4b3a


[FLINK-7656] [runtime] Switch to user classloader before calling initializeOnMaster and finalizeOnMaster.

This closes #4690.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4afca4b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4afca4b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4afca4b3

Branch: refs/heads/master
Commit: 4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2
Parents: d0debf4
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Sep 20 16:26:47 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Sep 22 18:32:29 2017 +0200

----------------------------------------------------------------------
 .../runtime/jobgraph/OutputFormatVertex.java    | 50 ++++++++-----
 .../runtime/jobgraph/JobTaskVertexTest.java     | 76 ++++++++++++++++----
 2 files changed, 98 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4afca4b3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index c9ac564..77f207c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -73,15 +73,24 @@ public class OutputFormatVertex extends JobVertex {
 		catch (Throwable t) {
 			throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
 		}
+
+		// set user classloader before calling user code
+		final ClassLoader prevContextCl = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(loader);
+
 		try {
-			outputFormat.configure(cfg.getStubParameters());
-		}
-		catch (Throwable t) {
-			throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
-		}
-		
-		if (outputFormat instanceof InitializeOnMaster) {
-			((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
+			// configure output format
+			try {
+				outputFormat.configure(cfg.getStubParameters());
+			} catch (Throwable t) {
+				throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
+			}
+			if (outputFormat instanceof InitializeOnMaster) {
+				((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
+			}
+		} finally {
+			// restore previous classloader
+			Thread.currentThread().setContextClassLoader(prevContextCl);
 		}
 	}
 	
@@ -107,15 +116,24 @@ public class OutputFormatVertex extends JobVertex {
 		catch (Throwable t) {
 			throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
 		}
+
+		// set user classloader before calling user code
+		final ClassLoader prevContextCl = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(loader);
+
 		try {
-			outputFormat.configure(cfg.getStubParameters());
-		}
-		catch (Throwable t) {
-			throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
-		}
-		
-		if (outputFormat instanceof FinalizeOnMaster) {
-			((FinalizeOnMaster) outputFormat).finalizeGlobal(getParallelism());
+			// configure output format
+			try {
+				outputFormat.configure(cfg.getStubParameters());
+			} catch (Throwable t) {
+				throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
+			}
+			if (outputFormat instanceof FinalizeOnMaster) {
+				((FinalizeOnMaster) outputFormat).finalizeGlobal(getParallelism());
+			}
+		} finally {
+			// restore previous classloader
+			Thread.currentThread().setContextClassLoader(prevContextCl);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4afca4b3/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index 970437a..794c5c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -33,6 +35,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
 
 import static org.junit.Assert.*;
 
@@ -84,10 +88,10 @@ public class JobTaskVertexTest {
 	@Test
 	public void testOutputFormatVertex() {
 		try {
-			final TestingOutputFormat outputFormat = new TestingOutputFormat();
+			final OutputFormat outputFormat = new TestingOutputFormat();
 			final OutputFormatVertex of = new OutputFormatVertex("Name");
 			new TaskConfig(of.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<OutputFormat<?>>(outputFormat));
-			final ClassLoader cl = getClass().getClassLoader();
+			final ClassLoader cl = new TestClassLoader();
 			
 			try {
 				of.initializeOnMaster(cl);
@@ -97,19 +101,29 @@ public class JobTaskVertexTest {
 			}
 			
 			OutputFormatVertex copy = InstantiationUtil.clone(of);
+			ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
 			try {
 				copy.initializeOnMaster(cl);
 				fail("Did not throw expected exception.");
 			} catch (TestException e) {
 				// all good
 			}
+			assertEquals("Previous classloader was not restored.", ctxCl, Thread.currentThread().getContextClassLoader());
+
+			try {
+				copy.finalizeOnMaster(cl);
+				fail("Did not throw expected exception.");
+			} catch (TestException e) {
+				// all good
+			}
+			assertEquals("Previous classloader was not restored.", ctxCl, Thread.currentThread().getContextClassLoader());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testInputFormatVertex() {
 		try {
@@ -134,17 +148,8 @@ public class JobTaskVertexTest {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class TestingOutputFormat extends DiscardingOutputFormat<Object>
implements InitializeOnMaster {
-		@Override
-		public void initializeGlobal(int parallelism) throws IOException {
-			throw new TestException();
-		}
-	}
-	
 	private static final class TestException extends IOException {}
 	
-	// --------------------------------------------------------------------------------------------
-	
 	private static final class TestSplit extends GenericInputSplit {
 		
 		public TestSplit(int partitionNumber, int totalNumberOfPartitions) {
@@ -169,4 +174,51 @@ public class JobTaskVertexTest {
 			return new GenericInputSplit[] { new TestSplit(0, 1) };
 		}
 	}
+
+	private static final class TestingOutputFormat extends DiscardingOutputFormat<Object>
implements InitializeOnMaster, FinalizeOnMaster {
+
+		private boolean isConfigured = false;
+
+		@Override
+		public void initializeGlobal(int parallelism) throws IOException {
+			if (!isConfigured) {
+				throw new IllegalStateException("OutputFormat was not configured before initializeGlobal
was called.");
+			}
+			if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
+				throw new IllegalStateException("Context ClassLoader was not correctly switched.");
+			}
+			// notify we have been here.
+			throw new TestException();
+		}
+
+		@Override
+		public void finalizeGlobal(int parallelism) throws IOException {
+			if (!isConfigured) {
+				throw new IllegalStateException("OutputFormat was not configured before finalizeGlobal
was called.");
+			}
+			if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
+				throw new IllegalStateException("Context ClassLoader was not correctly switched.");
+			}
+			// notify we have been here.
+			throw new TestException();
+		}
+
+		@Override
+		public void configure(Configuration parameters) {
+			if (isConfigured) {
+				throw new IllegalStateException("OutputFormat is already configured.");
+			}
+			if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
+				throw new IllegalStateException("Context ClassLoader was not correctly switched.");
+			}
+			isConfigured = true;
+		}
+
+	}
+
+	private static class TestClassLoader extends URLClassLoader {
+		public TestClassLoader() {
+			super(new URL[0], Thread.currentThread().getContextClassLoader());
+		}
+	}
 }


Mime
View raw message