flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2509] [runtime] Add class loader info into the exception message when user code classes are not found.
Date Wed, 12 Aug 2015 08:06:02 GMT
Repository: flink
Updated Branches:
  refs/heads/master d8d074809 -> eeec1912b


[FLINK-2509] [runtime] Add class loader info into the exception message when user code classes
are not found.

This closes #1008


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eeec1912
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eeec1912
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eeec1912

Branch: refs/heads/master
Commit: eeec1912b478ed43a045449d82e0a2fd3700d720
Parents: d8d0748
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Aug 11 16:07:22 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Aug 12 10:05:13 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/util/ClassLoaderUtil.java     | 126 ++++++++++++++++++
 .../runtime/util/ClassLoaderUtilsTest.java      | 131 +++++++++++++++++++
 .../flink/streaming/api/graph/StreamConfig.java |  23 +++-
 3 files changed, 276 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eeec1912/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
new file mode 100644
index 0000000..fbb707e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.jar.JarFile;
+
+/**
+ * Utilities for information with respect to class loaders, specifically class loaders for
+ * the dynamic loading of user defined classes.
+ */
+public class ClassLoaderUtil {
+
+	/**
+	 * Gets information about URL class loaders. The returned info string contains all URLs
of the
+	 * class loader. For file URLs, it contains in addition whether the referenced file exists,
+	 * is a valid JAR file, or is a directory.
+	 * 
+	 * <p>NOTE: This method makes a best effort to provide information about the classloader,
and
+	 * never throws an exception.</p>
+	 * 
+	 * @param loader The classloader to get the info string for.
+	 * @return The classloader information string.
+	 */
+	public static String getUserCodeClassLoaderInfo(ClassLoader loader) {
+		if (loader instanceof URLClassLoader) {
+			URLClassLoader cl = (URLClassLoader) loader;
+			
+			try {
+				StringBuilder bld = new StringBuilder();
+				
+				if (cl == ClassLoader.getSystemClassLoader()) {
+					bld.append("System ClassLoader: ");
+				}
+				else {
+					bld.append("URL ClassLoader:");
+				}
+				
+				for (URL url : cl.getURLs()) {
+					bld.append("\n    ");
+					if (url == null) {
+						bld.append("(null)");
+					}
+					else if ("file".equals(url.getProtocol())) {
+						String filePath = url.getPath();
+						File fileFile = new File(filePath);
+						
+						bld.append("file: '").append(filePath).append('\'');
+						
+						if (fileFile.exists()) {
+							if (fileFile.isDirectory()) {
+								bld.append(" (directory)");
+							}
+							else {
+								try {
+									new JarFile(filePath);
+									bld.append(" (valid JAR)");
+								}
+								catch (Exception e) {
+									bld.append(" (invalid JAR: ").append(e.getMessage()).append(')');
+								}
+							}
+						}
+						else {
+							bld.append(" (missing)");
+						}
+					}
+					else {
+						bld.append("url: ").append(url);
+					}
+				}
+				
+				return bld.toString();
+			}
+			catch (Throwable t) {
+				return "Cannot access classloader info due to an exception.\n"
+						+ ExceptionUtils.stringifyException(t);
+			}
+		}
+		else {
+			return "No user code ClassLoader";
+		}
+	}
+
+	/**
+	 * Checks, whether the class that was not found in the given exception, can be resolved
through
+	 * the given class loader.
+	 * 
+	 * @param cnfe The ClassNotFoundException that defines the name of the class.
+	 * @param cl The class loader to use for the class resolution.
+	 * @return True, if the class can be resolved with the given class loader, false if not.

+	 */
+	public static boolean validateClassLoadable(ClassNotFoundException cnfe, ClassLoader cl)
{
+		try {
+			String className = cnfe.getMessage();
+			Class.forName(className, false, cl);
+			return true;
+		}
+		catch (ClassNotFoundException e) {
+			return false;
+		}
+		catch (Exception e) {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eeec1912/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
new file mode 100644
index 0000000..d5f3c9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.jar.JarFile;
+
+/**
+ * Tests that validate the {@link ClassLoaderUtil}.
+ */
+public class ClassLoaderUtilsTest {
+
+	@Test
+	public void testWithURLClassLoader() {
+		File validJar = null;
+		File invalidJar = null;
+		
+		try {
+			// file with jar contents
+			validJar = File.createTempFile("flink-url-test", ".tmp");
+			JarFileCreator jarFileCreator = new JarFileCreator(validJar);
+			jarFileCreator.addClass(ClassLoaderUtilsTest.class);
+			jarFileCreator.createJarFile();
+			
+			// validate that the JAR is correct and the test setup is not broken
+			try {
+				new JarFile(validJar.getAbsolutePath());
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail("test setup broken: cannot create a valid jar file");
+			}
+			
+			// file with some random contents
+			invalidJar = File.createTempFile("flink-url-test", ".tmp");
+			try (FileOutputStream invalidout = new FileOutputStream(invalidJar)) {
+				invalidout.write(new byte[] { -1, 1, -2, 3, -3, 4, });
+			}
+			
+			// non existing file
+			File nonExisting = File.createTempFile("flink-url-test", ".tmp");
+			assertTrue("Cannot create and delete temp file", nonExisting.delete());
+			
+			
+			// create a URL classloader with
+			// - a HTTP URL
+			// - a file URL for an existing jar file
+			// - a file URL for an existing file that is not a jar file
+			// - a file URL for a non-existing file
+			
+			URL[] urls = {
+				new URL("http", "localhost", 26712, "/some/file/path"),
+				new URL("file", null, validJar.getAbsolutePath()),
+				new URL("file", null, invalidJar.getAbsolutePath()),
+				new URL("file", null, nonExisting.getAbsolutePath()),
+			};
+
+			URLClassLoader loader = new URLClassLoader(urls, getClass().getClassLoader());
+			String info = ClassLoaderUtil.getUserCodeClassLoaderInfo(loader);
+			
+			assertTrue(info.indexOf("/some/file/path") > 0);
+			assertTrue(info.indexOf(validJar.getAbsolutePath() + "' (valid") > 0);
+			assertTrue(info.indexOf(invalidJar.getAbsolutePath() + "' (invalid JAR") > 0);
+			assertTrue(info.indexOf(nonExisting.getAbsolutePath() + "' (missing") > 0);
+
+			System.out.println(info);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (validJar != null) {
+				//noinspection ResultOfMethodCallIgnored
+				validJar.delete();
+			}
+			if (invalidJar != null) {
+				//noinspection ResultOfMethodCallIgnored
+				invalidJar.delete();
+			}
+		}
+	}
+	
+	@Test
+	public void testWithAppClassLoader() {
+		try {
+			// must return something when invoked with 'null'
+			String result = ClassLoaderUtil.getUserCodeClassLoaderInfo(ClassLoader.getSystemClassLoader());
+			assertTrue(result.toLowerCase().contains("system classloader"));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testInvalidClassLoaders() {
+		try {
+			// must return something when invoked with 'null'
+			assertNotNull(ClassLoaderUtil.getUserCodeClassLoaderInfo(null));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eeec1912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 62735af..a8486d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -198,12 +199,26 @@ public class StreamConfig implements Serializable {
 			}
 		}
 	}
-
-	@SuppressWarnings({ "unchecked" })
+	
 	public <T> T getStreamOperator(ClassLoader cl) {
 		try {
-			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
-		} catch (Exception e) {
+			@SuppressWarnings("unchecked")
+			T result = (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
+			return result;
+		}
+		catch (ClassNotFoundException e) {
+			String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
+			boolean loadableDoubleCheck = ClassLoaderUtil.validateClassLoadable(e, cl);
+			
+			String exceptionMessage = "Cannot load user class: " + e.getMessage()
+					+ "\nClassLoader info: " + classLoaderInfo + 
+					(loadableDoubleCheck ? 
+							"Class was actually found in classloader - deserialization issue." :
+							"Class not resolveable through given classloader.");
+			
+			throw new StreamTaskException(exceptionMessage);
+		}
+		catch (Exception e) {
 			throw new StreamTaskException("Cannot instantiate user function.", e);
 		}
 	}


Mime
View raw message