Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1067E18EFF for ; Wed, 12 Aug 2015 08:06:03 +0000 (UTC) Received: (qmail 85824 invoked by uid 500); 12 Aug 2015 08:06:02 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 85784 invoked by uid 500); 12 Aug 2015 08:06:02 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 85775 invoked by uid 99); 12 Aug 2015 08:06:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Aug 2015 08:06:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 92E10E10A2; Wed, 12 Aug 2015 08:06:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Message-Id: <8716295d51114bf39466d623fc4646a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-2509] [runtime] Add class loader info into the exception message when user code classes are not found. Date: Wed, 12 Aug 2015 08:06:02 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master d8d074809 -> eeec1912b [FLINK-2509] [runtime] Add class loader info into the exception message when user code classes are not found. This closes #1008 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eeec1912 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eeec1912 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eeec1912 Branch: refs/heads/master Commit: eeec1912b478ed43a045449d82e0a2fd3700d720 Parents: d8d0748 Author: Stephan Ewen Authored: Tue Aug 11 16:07:22 2015 +0200 Committer: Stephan Ewen Committed: Wed Aug 12 10:05:13 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/util/ClassLoaderUtil.java | 126 ++++++++++++++++++ .../runtime/util/ClassLoaderUtilsTest.java | 131 +++++++++++++++++++ .../flink/streaming/api/graph/StreamConfig.java | 23 +++- 3 files changed, 276 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/eeec1912/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java new file mode 100644 index 0000000..fbb707e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ClassLoaderUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.util.ExceptionUtils; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.jar.JarFile; + +/** + * Utilities for information with respect to class loaders, specifically class loaders for + * the dynamic loading of user defined classes. + */ +public class ClassLoaderUtil { + + /** + * Gets information about URL class loaders. The returned info string contains all URLs of the + * class loader. For file URLs, it contains in addition whether the referenced file exists, + * is a valid JAR file, or is a directory. + * + *

NOTE: This method makes a best effort to provide information about the classloader, and + * never throws an exception.

+ * + * @param loader The classloader to get the info string for. + * @return The classloader information string. + */ + public static String getUserCodeClassLoaderInfo(ClassLoader loader) { + if (loader instanceof URLClassLoader) { + URLClassLoader cl = (URLClassLoader) loader; + + try { + StringBuilder bld = new StringBuilder(); + + if (cl == ClassLoader.getSystemClassLoader()) { + bld.append("System ClassLoader: "); + } + else { + bld.append("URL ClassLoader:"); + } + + for (URL url : cl.getURLs()) { + bld.append("\n "); + if (url == null) { + bld.append("(null)"); + } + else if ("file".equals(url.getProtocol())) { + String filePath = url.getPath(); + File fileFile = new File(filePath); + + bld.append("file: '").append(filePath).append('\''); + + if (fileFile.exists()) { + if (fileFile.isDirectory()) { + bld.append(" (directory)"); + } + else { + try { + new JarFile(filePath); + bld.append(" (valid JAR)"); + } + catch (Exception e) { + bld.append(" (invalid JAR: ").append(e.getMessage()).append(')'); + } + } + } + else { + bld.append(" (missing)"); + } + } + else { + bld.append("url: ").append(url); + } + } + + return bld.toString(); + } + catch (Throwable t) { + return "Cannot access classloader info due to an exception.\n" + + ExceptionUtils.stringifyException(t); + } + } + else { + return "No user code ClassLoader"; + } + } + + /** + * Checks, whether the class that was not found in the given exception, can be resolved through + * the given class loader. + * + * @param cnfe The ClassNotFoundException that defines the name of the class. + * @param cl The class loader to use for the class resolution. + * @return True, if the class can be resolved with the given class loader, false if not. + */ + public static boolean validateClassLoadable(ClassNotFoundException cnfe, ClassLoader cl) { + try { + String className = cnfe.getMessage(); + Class.forName(className, false, cl); + return true; + } + catch (ClassNotFoundException e) { + return false; + } + catch (Exception e) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/eeec1912/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java new file mode 100644 index 0000000..d5f3c9e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.jar.JarFile; + +/** + * Tests that validate the {@link ClassLoaderUtil}. + */ +public class ClassLoaderUtilsTest { + + @Test + public void testWithURLClassLoader() { + File validJar = null; + File invalidJar = null; + + try { + // file with jar contents + validJar = File.createTempFile("flink-url-test", ".tmp"); + JarFileCreator jarFileCreator = new JarFileCreator(validJar); + jarFileCreator.addClass(ClassLoaderUtilsTest.class); + jarFileCreator.createJarFile(); + + // validate that the JAR is correct and the test setup is not broken + try { + new JarFile(validJar.getAbsolutePath()); + } + catch (Exception e) { + e.printStackTrace(); + fail("test setup broken: cannot create a valid jar file"); + } + + // file with some random contents + invalidJar = File.createTempFile("flink-url-test", ".tmp"); + try (FileOutputStream invalidout = new FileOutputStream(invalidJar)) { + invalidout.write(new byte[] { -1, 1, -2, 3, -3, 4, }); + } + + // non existing file + File nonExisting = File.createTempFile("flink-url-test", ".tmp"); + assertTrue("Cannot create and delete temp file", nonExisting.delete()); + + + // create a URL classloader with + // - a HTTP URL + // - a file URL for an existing jar file + // - a file URL for an existing file that is not a jar file + // - a file URL for a non-existing file + + URL[] urls = { + new URL("http", "localhost", 26712, "/some/file/path"), + new URL("file", null, validJar.getAbsolutePath()), + new URL("file", null, invalidJar.getAbsolutePath()), + new URL("file", null, nonExisting.getAbsolutePath()), + }; + + URLClassLoader loader = new URLClassLoader(urls, getClass().getClassLoader()); + String info = ClassLoaderUtil.getUserCodeClassLoaderInfo(loader); + + assertTrue(info.indexOf("/some/file/path") > 0); + assertTrue(info.indexOf(validJar.getAbsolutePath() + "' (valid") > 0); + assertTrue(info.indexOf(invalidJar.getAbsolutePath() + "' (invalid JAR") > 0); + assertTrue(info.indexOf(nonExisting.getAbsolutePath() + "' (missing") > 0); + + System.out.println(info); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + if (validJar != null) { + //noinspection ResultOfMethodCallIgnored + validJar.delete(); + } + if (invalidJar != null) { + //noinspection ResultOfMethodCallIgnored + invalidJar.delete(); + } + } + } + + @Test + public void testWithAppClassLoader() { + try { + // must return something when invoked with 'null' + String result = ClassLoaderUtil.getUserCodeClassLoaderInfo(ClassLoader.getSystemClassLoader()); + assertTrue(result.toLowerCase().contains("system classloader")); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testInvalidClassLoaders() { + try { + // must return something when invoked with 'null' + assertNotNull(ClassLoaderUtil.getUserCodeClassLoaderInfo(null)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/eeec1912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 62735af..a8486d3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.util.ClassLoaderUtil; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -198,12 +199,26 @@ public class StreamConfig implements Serializable { } } } - - @SuppressWarnings({ "unchecked" }) + public T getStreamOperator(ClassLoader cl) { try { - return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl); - } catch (Exception e) { + @SuppressWarnings("unchecked") + T result = (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl); + return result; + } + catch (ClassNotFoundException e) { + String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl); + boolean loadableDoubleCheck = ClassLoaderUtil.validateClassLoadable(e, cl); + + String exceptionMessage = "Cannot load user class: " + e.getMessage() + + "\nClassLoader info: " + classLoaderInfo + + (loadableDoubleCheck ? + "Class was actually found in classloader - deserialization issue." : + "Class not resolveable through given classloader."); + + throw new StreamTaskException(exceptionMessage); + } + catch (Exception e) { throw new StreamTaskException("Cannot instantiate user function.", e); } }