flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-924] Add JarFileCreator tests for anonymous classes and Java 8 lambdas
Date Tue, 28 Apr 2015 07:32:09 GMT
[FLINK-924] Add JarFileCreator tests for anonymous classes and Java 8 lambdas

This closes #35


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64607433
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64607433
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64607433

Branch: refs/heads/master
Commit: 64607433d449214dbdee326c5a798fe36d1ed0ca
Parents: c527a2b
Author: mingliang <qmlmoon@gmail.com>
Authored: Tue Oct 28 14:52:56 2014 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Apr 28 09:31:24 2015 +0200

----------------------------------------------------------------------
 .../runtime/util/JarFileCreatorLambdaTest.java  | 111 +++++++++++++
 .../util/jartestprogram/FilterLambda1.java      |  39 +++++
 .../util/jartestprogram/FilterLambda2.java      |  35 ++++
 .../util/jartestprogram/FilterLambda3.java      |  36 +++++
 .../util/jartestprogram/FilterLambda4.java      |  35 ++++
 .../util/jartestprogram/UtilFunction.java       |  27 ++++
 .../jartestprogram/UtilFunctionWrapper.java     |  29 ++++
 .../runtime/util/jartestprogram/WordFilter.java |  25 +++
 flink-runtime/pom.xml                           |   2 +-
 .../flink/runtime/util/DependencyVisitor.java   |  30 ----
 .../flink/runtime/util/JarFileCreatorTest.java  | 162 +++++++++++++------
 .../AnonymousInNonStaticMethod.java             |  39 +++++
 .../AnonymousInNonStaticMethod2.java            |  40 +++++
 .../jartestprogram/AnonymousInStaticMethod.java |  40 +++++
 .../NestedAnonymousInnerClass.java              |  45 ++++++
 pom.xml                                         |   2 +-
 16 files changed, 617 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
new file mode 100644
index 0000000..249e082
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+
+import org.apache.flink.runtime.util.jartestprogram.FilterLambda1;
+import org.apache.flink.runtime.util.jartestprogram.FilterLambda2;
+import org.apache.flink.runtime.util.jartestprogram.FilterLambda3;
+import org.apache.flink.runtime.util.jartestprogram.FilterLambda4;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarInputStream;
+import java.util.zip.ZipEntry;
+
+public class JarFileCreatorLambdaTest {
+	@Test
+	public void TestFilterFunctionOnLambda1() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterLambda1.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda1.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+
+		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
+		out.delete();
+	}
+
+	@Test
+	public void TestFilterFunctionOnLambda2() throws Exception{
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterLambda2.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda2.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+
+		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
+		out.delete();
+	}
+
+	@Test
+	public void TestFilterFunctionOnLambda3() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterLambda3.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda3.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunction.class");
+
+		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
+		out.delete();
+	}
+
+	@Test
+	public void TestFilterFunctionOnLambda4() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(FilterLambda4.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/FilterLambda4.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/WordFilter.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper$UtilFunction.class");
+
+		Assert.assertTrue("Jar file for java 8 lambda is not correct", validate(ans, out));
+		out.delete();
+	}
+
+	public boolean validate(Set<String> expected, File out) throws Exception {
+
+		JarInputStream jis = new JarInputStream(new FileInputStream(out));
+		ZipEntry ze;
+		int count = expected.size();
+		while ((ze = jis.getNextEntry()) != null) {
+			count--;
+			expected.remove(ze.getName());
+		}
+		return count == 0 && expected.size() == 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
new file mode 100644
index 0000000..cd0c9e7
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+public class FilterLambda1 {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		FilterFunction<String> filter = (v) -> WordFilter.filter(v);
+
+		DataSet<String> output = input.filter(filter);
+		output.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
new file mode 100644
index 0000000..06c279d
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda2.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+public class FilterLambda2 {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		DataSet<String> output = input.filter((v) -> WordFilter.filter(v));
+		output.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
new file mode 100644
index 0000000..27fd2d9
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda3.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+public class FilterLambda3 {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		DataSet<String> output = input.filter(UtilFunction.getWordFilter());
+		output.print();
+
+		env.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
new file mode 100644
index 0000000..e66adb0
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/FilterLambda4.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+public class FilterLambda4 {
+
+	public static void main(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> input = env.fromElements("Please filter", "the words", "but not this");
+
+		DataSet<String> output = input.filter(UtilFunctionWrapper.UtilFunction.getWordFilter());
+		output.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
new file mode 100644
index 0000000..e662015
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class UtilFunction {
+	public static FilterFunction<String> getWordFilter() {
+		return (v) -> WordFilter.filter(v);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
new file mode 100644
index 0000000..f97cdd8
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/UtilFunctionWrapper.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class UtilFunctionWrapper {
+	public static class UtilFunction {
+		public static FilterFunction<String> getWordFilter() {
+			return (v) -> WordFilter.filter(v);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
new file mode 100644
index 0000000..c6f833a
--- /dev/null
+++ b/flink-java8/src/test/java/org/apache/flink/runtime/util/jartestprogram/WordFilter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+public class WordFilter {
+	public static boolean filter(String value) {
+		return !value.contains("not");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index cc03eab..f490ed9 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -119,7 +119,7 @@ under the License.
 		<dependency>
 			<groupId>org.ow2.asm</groupId>
 			<artifactId>asm-all</artifactId>
-			<version>5.0.3</version>
+			<version>${asm.version}</version>
 		</dependency>
 		
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
index 5119a38..469dc77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
@@ -23,15 +23,12 @@ import org.objectweb.asm.ClassVisitor;
 import org.objectweb.asm.Opcodes;
 import org.objectweb.asm.FieldVisitor;
 import org.objectweb.asm.MethodVisitor;
-import org.objectweb.asm.ClassReader;
 import org.objectweb.asm.Type;
 import org.objectweb.asm.TypePath;
 import org.objectweb.asm.Label;
 import org.objectweb.asm.signature.SignatureReader;
 import org.objectweb.asm.signature.SignatureVisitor;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -43,12 +40,8 @@ import java.util.Set;
  */
 public class DependencyVisitor extends ClassVisitor {
 
-	private static final String CLASS_EXTENSION = ".class";
-
 	private Set<String> packages = new HashSet<String>();
 
-	private Set<String> innerClasses = new HashSet<String>();
-
 	private Set<String> nameSpace = new HashSet<String>();
 
 	public Set<String> getPackages() {
@@ -100,29 +93,6 @@ public class DependencyVisitor extends ClassVisitor {
 		return new MethodVisitorImpl(Opcodes.ASM5);
 	}
 
-	@Override
-	public void visitInnerClass(String name, String outerName, String innerName, int access)
{
-		if (!innerClasses.contains(name)) {
-			try {
-				innerClasses.add(name);
-				Class clazz = Class.forName(name.replace('/', '.'));
-				int n = name.lastIndexOf('/');
-				String className = null;
-				if (n > -1) {
-					className = name.substring(n + 1, name.length());
-				}
-
-				InputStream classInputStream = clazz.getResourceAsStream(className + CLASS_EXTENSION);
-				new ClassReader(classInputStream).accept(this, 0);
-				classInputStream.close();
-			} catch (ClassNotFoundException e) {
-				e.printStackTrace();
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
-		}
-	}
-
 	// ---------------------------------------------------------------------------------------------
 
 	public void addNameSpace(Set<String> names) {

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index a5f9de9..ba207ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -23,6 +23,10 @@ import org.apache.flink.runtime.util.jartestprogram.WordCountWithAnonymousClass;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithExternalClass2;
 import org.apache.flink.runtime.util.jartestprogram.WordCountWithInnerClass;
+import org.apache.flink.runtime.util.jartestprogram.AnonymousInStaticMethod;
+import org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod;
+import org.apache.flink.runtime.util.jartestprogram.AnonymousInNonStaticMethod2;
+import org.apache.flink.runtime.util.jartestprogram.NestedAnonymousInnerClass;
 import org.junit.Assert;
 import org.junit.Test;
 import java.io.File;
@@ -36,118 +40,180 @@ import java.util.zip.ZipEntry;
 
 public class JarFileCreatorTest {
 
+	//anonymous inner class in static method accessing a local variable in its closure.
+	@Test
+	public void TestAnonymousInnerClassTrick1() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(AnonymousInStaticMethod.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$1.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod$A.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.class");
+
+		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
+
+		out.delete();
+	}
+
+	//anonymous inner class in non static method accessing a local variable in its closure.
+	@Test
+	public void TestAnonymousInnerClassTrick2() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(AnonymousInNonStaticMethod.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$1.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod$A.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.class");
+
+		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
+
+		out.delete();
+	}
+
+	//anonymous inner class in non static method accessing a field of its enclosing class.
+	@Test
+	public void TestAnonymousInnerClassTrick3() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(AnonymousInNonStaticMethod2.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$1.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2$A.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.class");
+
+		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
+
+		out.delete();
+	}
+
+	//anonymous inner class in an anonymous inner class accessing a field of the outermost enclosing
class.
+	@Test
+	public void TestAnonymousInnerClassTrick4() throws Exception {
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
+		JarFileCreator jfc = new JarFileCreator(out);
+		jfc.addClass(NestedAnonymousInnerClass.class)
+			.createJarFile();
+
+		Set<String> ans = new HashSet<String>();
+		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class");
+		ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$A.class");
+
+		Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
+
+		out.delete();
+	}
+
+	//----------------------------------------------------------------------------------------------
+	//Word Count Example
+
 	@Test
 	public void TestExternalClass() throws IOException {
-		File out = new File("/tmp/jarcreatortest1.jar");
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(WordCountWithExternalClass.class).createJarFile();
+		jfc.addClass(WordCountWithExternalClass.class)
+			.createJarFile();
 
 		Set<String> ans = new HashSet<String>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class");
-		JarInputStream jis = new JarInputStream(new FileInputStream(out));
-		ZipEntry ze;
-		int count = 3;
-		while ((ze = jis.getNextEntry()) != null) {
-			count--;
-			ans.remove(ze.getName());
-		}
-		Assert.assertTrue("Jar file for External Class is not correct", count == 0 && ans.size()
== 0);
+
+		Assert.assertTrue("Jar file for External Class is not correct", validate(ans, out));
 
 		out.delete();
 	}
 
 	@Test
 	public void TestInnerClass() throws IOException {
-		File out = new File("/tmp/jarcreatortest2.jar");
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(WordCountWithInnerClass.class).createJarFile();
+		jfc.addClass(WordCountWithInnerClass.class)
+			.createJarFile();
 
 		Set<String> ans = new HashSet<String>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
-		JarInputStream jis = new JarInputStream(new FileInputStream(out));
-		ZipEntry ze;
-		int count = 3;
-		while ((ze = jis.getNextEntry()) != null) {
-			count--;
-			ans.remove(ze.getName());
-		}
-		Assert.assertTrue("Jar file for Inner Class is not correct", count == 0 && ans.size()
== 0);
+
+		Assert.assertTrue("Jar file for Inner Class is not correct", validate(ans, out));
 
 		out.delete();
 	}
 
 	@Test
 	public void TestAnonymousClass() throws IOException {
-		File out = new File("/tmp/jarcreatortest3.jar");
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(WordCountWithAnonymousClass.class).createJarFile();
+		jfc.addClass(WordCountWithAnonymousClass.class)
+			.createJarFile();
 
 		Set<String> ans = new HashSet<String>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithAnonymousClass$1.class");
-		JarInputStream jis = new JarInputStream(new FileInputStream(out));
-		ZipEntry ze;
-		int count = 3;
-		while ((ze = jis.getNextEntry()) != null) {
-			count--;
-			ans.remove(ze.getName());
-		}
-		Assert.assertTrue("Jar file for Anonymous Class is not correct", count == 0 &&
ans.size() == 0);
+
+		Assert.assertTrue("Jar file for Anonymous Class is not correct", validate(ans, out));
 
 		out.delete();
 	}
 
 	@Test
 	public void TestExtendIdentifier() throws IOException {
-		File out = new File("/tmp/jarcreatortest4.jar");
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
-		jfc.addClass(WordCountWithExternalClass2.class).createJarFile();
+		jfc.addClass(WordCountWithExternalClass2.class)
+			.createJarFile();
 
 		Set<String> ans = new HashSet<String>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithExternalClass2.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer2.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/ExternalTokenizer.class");
-		JarInputStream jis = new JarInputStream(new FileInputStream(out));
-		ZipEntry ze;
-		int count = 4;
-		while ((ze = jis.getNextEntry()) != null) {
-			count--;
-			ans.remove(ze.getName());
-		}
-		Assert.assertTrue("Jar file for Extend Identifier is not correct", count == 0 &&
ans.size() == 0);
+
+		Assert.assertTrue("Jar file for Extend Identifier is not correct", validate(ans, out));
 
 		out.delete();
 	}
 
 	@Test
 	public void TestUDFPackage() throws IOException {
-		File out = new File("/tmp/jarcreatortest5.jar");
+		File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar");
 		JarFileCreator jfc = new JarFileCreator(out);
 		jfc.addClass(WordCountWithInnerClass.class)
-			.addPackage("org.apache.flink.util").createJarFile();
+			.addPackage("org.apache.flink.util")
+			.createJarFile();
 
 		Set<String> ans = new HashSet<String>();
 		ans.add("org/apache/flink/runtime/util/jartestprogram/StaticData.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass.class");
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
 		ans.add("org/apache/flink/util/Collector.class");
+
+		Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out));
+
+		out.delete();
+	}
+
+	private boolean validate(Set<String> expected, File out) throws IOException {
+
 		JarInputStream jis = new JarInputStream(new FileInputStream(out));
 		ZipEntry ze;
-		int count = 4;
+		int count = expected.size();
 		while ((ze = jis.getNextEntry()) != null) {
 			count--;
-			ans.remove(ze.getName());
+			expected.remove(ze.getName());
 		}
-		Assert.assertTrue("Jar file for UDF package is not correct", count == 0 && ans.size()
== 0);
-
-		out.delete();
+		return count == 0 && expected.size() == 0;
 	}
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.java
new file mode 100644
index 0000000..ccceca8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+/**
+ * anonymous inner class in non static method accessing a local variable in its closure.
+ */
+public class AnonymousInNonStaticMethod {
+
+	public void testMethod() {
+		final int i = 1;
+		new A(){
+			@Override
+			public void t() {
+				System.out.println(i);
+			}
+		};
+	}
+
+	public interface A {
+		void t();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.java
new file mode 100644
index 0000000..5e05dc0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInNonStaticMethod2.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+/**
+ * anonymous inner class in non static method accessing a field of its enclosing class.
+ */
+public class AnonymousInNonStaticMethod2 {
+	//
+	public void testMethod() {
+		new A(){
+			@Override
+			public void t() {
+				System.out.println(s);
+			}
+		};
+	}
+
+	private int s = 1;
+
+	public interface A {
+		void t();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.java
new file mode 100644
index 0000000..93b2587
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/AnonymousInStaticMethod.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+
+/**
+ * anonymous inner class in static method accessing a local variable in its closure.
+ */
+public class AnonymousInStaticMethod {
+
+	public static void testMethod() {
+		final int i = 1;
+		new A(){
+			@Override
+			public void t() {
+				System.out.println(i);
+			}
+		};
+	}
+
+	public interface A {
+		void t();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.java
new file mode 100644
index 0000000..8c3e1b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.jartestprogram;
+
+/**
+ * anonymous inner class in an anonymous inner class accessing a field of the outermost enclosing
class.
+ */
+public class NestedAnonymousInnerClass {
+
+	public void testMethod() {
+		new A() {
+			@Override
+			public void t() {
+				new A() {
+					@Override
+					public void t() {
+						System.out.println(s);
+					}
+				};
+			}
+		};
+	}
+
+	private int s = 1;
+
+	public interface A {
+		void t();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64607433/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b059e32..524e45f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@ under the License.
 		<kryoserialization.version>0.3.2</kryoserialization.version>
 		<protobuf.version>2.5.0</protobuf.version>
 		<chill.version>0.5.2</chill.version>
-		<asm.version>4.0</asm.version>
+		<asm.version>5.0.3</asm.version>
         <tez.version>0.6.0</tez.version>
 	</properties>
 


Mime
View raw message