flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-7490] [table] Use correct classloader to compile generated code that calls UDAGGs.
Date Wed, 15 Nov 2017 16:55:54 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.4 06a922e81 -> 084ff68d5


[FLINK-7490] [table] Use correct classloader to compile generated code that calls UDAGGs.

This closes #5018.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/084ff68d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/084ff68d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/084ff68d

Branch: refs/heads/release-1.4
Commit: 084ff68d5434805d9fc4208fd52f04c2201e362c
Parents: 06a922e
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Nov 15 10:12:55 2017 +0100
Committer: twalthr <twalthr@apache.org>
Committed: Wed Nov 15 17:55:25 2017 +0100

----------------------------------------------------------------------
 .../operators/translation/RichCombineToGroupCombineWrapper.java    | 1 +
 .../flink/table/runtime/aggregate/AggregateAggFunction.scala       | 2 +-
 .../apache/flink/table/runtime/aggregate/DataSetAggFunction.scala  | 2 +-
 .../flink/table/runtime/aggregate/DataSetFinalAggFunction.scala    | 2 +-
 .../flink/table/runtime/aggregate/DataSetPreAggFunction.scala      | 2 +-
 .../aggregate/DataSetSessionWindowAggReduceGroupFunction.scala     | 2 +-
 .../aggregate/DataSetSessionWindowAggregatePreProcessor.scala      | 2 +-
 .../aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala   | 2 +-
 .../aggregate/DataSetSlideWindowAggReduceCombineFunction.scala     | 2 +-
 .../aggregate/DataSetSlideWindowAggReduceGroupFunction.scala       | 2 +-
 .../aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala | 2 +-
 .../DataSetTumbleTimeWindowAggReduceCombineFunction.scala          | 2 +-
 .../aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala  | 2 +-
 13 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index 3f6463a..9cbda50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -42,6 +42,7 @@ public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduce
 
 	@Override
 	public void open(Configuration config) throws Exception {
+		wrappedFunction.setRuntimeContext(getRuntimeContext());
 		wrappedFunction.open(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 330386b..4dbaeea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -70,7 +70,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      Thread.currentThread().getContextClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
index bc0c163..ced1450 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
@@ -46,7 +46,7 @@ class DataSetAggFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
index 3b3be70..f2eb3d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
@@ -47,7 +47,7 @@ class DataSetFinalAggFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
index fc3366b..744a739 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
@@ -48,7 +48,7 @@ class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
index 372fc0d..0d54de6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
@@ -72,7 +72,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index 666bfee..35e8142 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -59,7 +59,7 @@ class DataSetSessionWindowAggregatePreProcessor(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
index 3af7969..f2987a7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
@@ -68,7 +68,7 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
index 2da838f..6a9d631 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
@@ -67,7 +67,7 @@ class DataSetSlideWindowAggReduceCombineFunction(
     LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " +
       s"Code:\n$genPreAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genPreAggregations.name,
       genPreAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
index 474a09b..f96e841 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
@@ -63,7 +63,7 @@ class DataSetSlideWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
index 22fe389..f4d347a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala
@@ -50,7 +50,7 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
index 9eeab33..a3a72ae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala
@@ -66,7 +66,7 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
     LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " +
       s"Code:\n$genPreAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genPreAggregations.name,
       genPreAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")

http://git-wip-us.apache.org/repos/asf/flink/blob/084ff68d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
index 4e92148..14e89ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
@@ -62,7 +62,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
                 s"Code:\n$genAggregations.code")
     val clazz = compile(
-      getClass.getClassLoader,
+      getRuntimeContext.getUserCodeClassLoader,
       genAggregations.name,
       genAggregations.code)
     LOG.debug("Instantiating AggregateHelper.")


Mime
View raw message