flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] flink git commit: [cascading] load user classloader when configuring InputFormat
Date Thu, 30 Jul 2015 13:06:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master 73af89114 -> ca6dd4275


[cascading] load user classloader when configuring InputFormat

This closes #950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca6dd427
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca6dd427
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca6dd427

Branch: refs/heads/master
Commit: ca6dd42759f180779f9dfef63535f297fcb2eaf0
Parents: 466f9bf
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Jul 29 14:51:14 2015 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Thu Jul 30 15:06:06 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobgraph/InputFormatVertex.java | 8 +++++++-
 .../org/apache/flink/runtime/operators/DataSinkTask.java     | 8 +++++++-
 .../org/apache/flink/runtime/operators/DataSourceTask.java   | 8 +++++++-
 3 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
index 011850c..781108c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java
@@ -71,14 +71,20 @@ public class InputFormatVertex extends JobVertex {
 		catch (Throwable t) {
 			throw new Exception("Instantiating the InputFormat (" + formatDescription + ") failed:
" + t.getMessage(), t);
 		}
-		
+
+		Thread thread = Thread.currentThread();
+		ClassLoader original = thread.getContextClassLoader();
 		// configure
 		try {
+			thread.setContextClassLoader(loader);
 			inputFormat.configure(cfg.getStubParameters());
 		}
 		catch (Throwable t) {
 			throw new Exception("Configuring the InputFormat (" + formatDescription + ") failed: "
+ t.getMessage(), t);
 		}
+		finally {
+			thread.setContextClassLoader(original);
+		}
 		
 		setInputSplitSource(inputFormat);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index b3130a1..d291b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -323,15 +323,21 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		catch (ClassCastException ccex) {
 			throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(),
ccex);
 		}
-		
+
+		Thread thread = Thread.currentThread();
+		ClassLoader original = thread.getContextClassLoader();
 		// configure the stub. catch exceptions here extra, to report them as originating from
the user code 
 		try {
+			thread.setContextClassLoader(userCodeClassLoader);
 			this.format.configure(this.config.getStubParameters());
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("The user defined 'configure()' method in the Output Format
caused an error: " 
 				+ t.getMessage(), t);
 		}
+		finally {
+			thread.setContextClassLoader(original);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ca6dd427/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 3f1c642..df81408 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -266,13 +266,19 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					ccex);
 		}
 
-		// configure the stub. catch exceptions here extra, to report them as originating from
the user code 
+		Thread thread = Thread.currentThread();
+		ClassLoader original = thread.getContextClassLoader();
+		// configure the stub. catch exceptions here extra, to report them as originating from
the user code
 		try {
+			thread.setContextClassLoader(userCodeClassLoader);
 			this.format.configure(this.config.getStubParameters());
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("The user defined 'configure()' method caused an error: " +
t.getMessage(), t);
 		}
+		finally {
+			thread.setContextClassLoader(original);
+		}
 
 		// get the factory for the type serializer
 		this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);


Mime
View raw message