apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-767) Duplicate class loading in CLI for single application launch
Date Sun, 19 Nov 2017 02:27:00 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258299#comment-16258299
] 

ASF GitHub Bot commented on APEXCORE-767:
-----------------------------------------

tweise closed pull request #563: APEXCORE-767.set parent classloader in StramAppLauncher.loadDependencies
URL: https://github.com/apache/apex-core/pull/563
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 2451b1093d..a4ef3f5b94 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -2151,7 +2151,7 @@ public void write(int b)
         } else {
           System.err.println("No application specified.");
         }
-
+        submitApp.resetContextClassLoader();
       } finally {
         IOUtils.closeQuietly(cp);
       }
@@ -2911,8 +2911,10 @@ public void execute(String[] args, ConsoleReader reader) throws Exception
           submitApp.loadDependencies();
           List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp,
appName, commandLineInfo.exactMatch);
           if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+            submitApp.resetContextClassLoader();
             throw new CliException("No application in jar file matches '" + appName + "'");
           } else if (matchingAppFactories.size() > 1) {
+            submitApp.resetContextClassLoader();
             throw new CliException("More than one application in jar file match '" + appName
+ "'");
           } else {
             Map<String, Object> map = new HashMap<>();
@@ -2940,6 +2942,7 @@ public void write(int b)
               }
             }
             printJson(map);
+            submitApp.resetContextClassLoader();
           }
         } else {
           if (filename.endsWith(".json")) {
@@ -2971,6 +2974,7 @@ public void write(int b)
               appList.add(m);
             }
             printJson(appList, "applications");
+            submitApp.resetContextClassLoader();
           }
         }
       } else {
@@ -3200,8 +3204,10 @@ public void execute(String[] args, ConsoleReader reader) throws Exception
         submitApp.loadDependencies();
         List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp,
appName, true);
         if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
+          submitApp.resetContextClassLoader();
           throw new CliException("No application in jar file matches '" + appName + "'");
         } else if (matchingAppFactories.size() > 1) {
+          submitApp.resetContextClassLoader();
           throw new CliException("More than one application in jar file match '" + appName
+ "'");
         } else {
           AppFactory appFactory = matchingAppFactories.get(0);
@@ -3211,6 +3217,7 @@ public void execute(String[] args, ConsoleReader reader) throws Exception
             file.createNewFile();
           }
           LogicalPlanSerializer.convertToProperties(logicalPlan).save(file);
+          submitApp.resetContextClassLoader();
         }
       } else {
         if (currentApp == null) {
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index a606b0668a..5e03438db1 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -465,8 +465,9 @@ public void processAppDirectory(boolean skipJars)
 
       if (entry.getName().endsWith(".jar") && !skipJars) {
         appJars.add(entry.getName());
+        StramAppLauncher stramAppLauncher = null;
         try {
-          StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config);
+          stramAppLauncher = new StramAppLauncher(entry, config);
           stramAppLauncher.loadDependencies();
           List<AppFactory> appFactories = stramAppLauncher.getBundledTopologies();
           for (AppFactory appFactory : appFactories) {
@@ -486,6 +487,10 @@ public void processAppDirectory(boolean skipJars)
           }
         } catch (Exception ex) {
           LOG.error("Caught exception trying to process {}", entry.getName(), ex);
+        } finally {
+          if (stramAppLauncher != null) {
+            stramAppLauncher.resetContextClassLoader();
+          }
         }
       }
     }
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index d4f0170f3c..2019f489f0 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -106,6 +106,9 @@
   private LinkedHashSet<File> deployJars;
   private final StringWriter mvnBuildClasspathOutput = new StringWriter();
 
+  private ClassLoader initialClassLoader;
+  private Thread loaderThread;
+
   public interface AppFactory
   {
     LogicalPlan createApp(LogicalPlanConfiguration conf);
@@ -220,7 +223,6 @@ public String getDisplayName()
     }
   }
 
-
   public StramAppLauncher(File appJarFile, Configuration conf) throws Exception
   {
     this.jarFile = appJarFile;
@@ -535,10 +537,32 @@ public void runLocal(AppFactory appConfig) throws Exception
 
   public URLClassLoader loadDependencies()
   {
-    URLClassLoader cl = URLClassLoader.newInstance(launchDependencies.toArray(new URL[launchDependencies.size()]));
-    Thread.currentThread().setContextClassLoader(cl);
-    StringCodecs.check();
-    return cl;
+    if (this.loaderThread == null && this.initialClassLoader == null) {
+      this.loaderThread = Thread.currentThread();
+      this.initialClassLoader = Thread.currentThread().getContextClassLoader();
+    }
+
+    if (Thread.currentThread() != this.loaderThread) {
+      throw new RuntimeException("Calls to loadDependencies can only be made on the same
thread that loadDependencies was called on for the first time");
+    } else {
+      URL[] dependencies = launchDependencies.toArray(new URL[launchDependencies.size()]);
+
+      ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+      URLClassLoader cl = URLClassLoader.newInstance(dependencies, currentContextClassLoader);
+      Thread.currentThread().setContextClassLoader(cl);
+
+      StringCodecs.check();
+      return cl;
+    }
+  }
+
+  public void resetContextClassLoader()
+  {
+    if (Thread.currentThread() != this.loaderThread) {
+      throw new RuntimeException("Calls to resetContextClassLoader can only be made on the
same thread that loadDependencies was called on for the first time");
+    }
+
+    Thread.currentThread().setContextClassLoader(initialClassLoader);
   }
 
   private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 9a69b081b7..7b43ab5fa0 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -80,6 +80,7 @@ public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
         }
       };
       ApplicationId appId = appLauncher.launchApp(appFactory);
+      appLauncher.resetContextClassLoader();
       return new YarnAppHandleImpl(appId, conf);
     } catch (Exception ex) {
       throw new LauncherException(ex);
diff --git a/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java
new file mode 100644
index 0000000000..e8d692e8f4
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/client/AsyncTester.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.client;
+
+// See https://stackoverflow.com/a/2596530
+public class AsyncTester
+{
+  private Thread thread;
+  private volatile AssertionError error;
+
+  public AsyncTester(final Runnable runnable)
+  {
+    thread = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        try {
+          runnable.run();
+        } catch (AssertionError e) {
+          error = e;
+        }
+      }
+    });
+  }
+
+  public AsyncTester start()
+  {
+    thread.start();
+    return this;
+  }
+
+  public void test() throws AssertionError, InterruptedException
+  {
+    thread.join();
+    if (error != null) {
+      throw error;
+    }
+  }
+}
diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
index 2069bab553..e31a6dda1f 100644
--- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java
@@ -20,6 +20,8 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
+import java.util.LinkedHashSet;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -53,6 +55,141 @@
 
   private static final String SET_TOKEN_REFRESH_CREDENTIALS_METHOD = "setTokenRefreshCredentials";
 
+  @PrepareForTest({StramAppLauncher.class})
+  @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
+  public static class LoadDependenciesTest
+  {
+
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    @Rule
+    public TestWatcher setup = new TestWatcher()
+    {
+      @Override
+      protected void starting(Description description)
+      {
+        super.starting(description);
+        suppress(method(StramAppLauncher.class, "init"));
+      }
+    };
+
+    @Test
+    public void testLoadDependenciesSetsParentClassLoader() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      // Get initial contextClassLoader
+      ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader();
+
+      appLauncher.loadDependencies();
+
+      // Make sure that new contextClassLoader has initialClassLoader as parent
+      ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
+
+      Assert.assertSame(initialClassLoader, currentClassLoader.getParent());
+    }
+
+    @Test
+    public void testResetContextClassLoaderResetsToInitialClassLoader() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      // Get initial contextClassLoader
+      ClassLoader initialClassLoader = Thread.currentThread().getContextClassLoader();
+
+      appLauncher.loadDependencies();
+      appLauncher.resetContextClassLoader();
+
+      ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
+      Assert.assertSame(initialClassLoader, currentClassLoader);
+    }
+
+    @Test
+    public void testResetContextClassloaderOnlyOnInitialThread() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.loadDependencies();
+          } catch (Exception e) {
+            Assert.fail(e.getMessage());
+          }
+        }
+      }).start().test();
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.resetContextClassLoader();
+            Assert.fail("An exception should be thrown");
+          } catch (RuntimeException e) {
+            // catch as expected
+          }
+        }
+      }).start().test();
+    }
+
+    @Test
+    public void testLoadDependenciesOnlyOnInitialThread() throws Exception
+    {
+      // Setup
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.newInstance(conf);
+      final StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
+      Whitebox.setInternalState(appLauncher, "launchDependencies", new LinkedHashSet<URL>());
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.loadDependencies();
+          } catch (Exception e) {
+            Assert.fail(e.getMessage());
+          }
+        }
+      }).start().test();
+
+      new AsyncTester(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          try {
+            appLauncher.loadDependencies();
+            Assert.fail("An exception should be thrown");
+          } catch (RuntimeException e) {
+            // catch as expected
+          }
+        }
+      }).start().test();
+    }
+  }
+
   @PrepareForTest({StramAppLauncher.class})
   @PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
   public static class RefreshTokenTests


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Duplicate class loading in CLI for single application launch 
> -------------------------------------------------------------
>
>                 Key: APEXCORE-767
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-767
>             Project: Apache Apex Core
>          Issue Type: Bug
>            Reporter: Thomas Weise
>            Assignee: Florian Schmidt
>
> StramAppLauncher will create multiple class loaders in a way that leads to duplicate
loading of the same class (while listing app package, then when launching). It also appears
that the context class loader is never reset.
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message