impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbap...@apache.org
Subject [1/5] incubator-impala git commit: IMPALA-3983/IMPALA-3974: Delete function jar resources after load
Date Fri, 04 Nov 2016 17:56:07 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 1af0aa4ad -> 32294220c


IMPALA-3983/IMPALA-3974: Delete function jar resources after load

The Catalog copies the UDF jar files to the local file system to
load the Java UDF classes for validation purposes. However we
do not clean them up after the UDF load and hence on a deployment
with large number of functions registered, these jar can accumulate
over a period of time and  can fill up the tmp space. We fix it by
deleting the jar resource once the function is loaded.

Also, this patch switches to --local_library_dir for copying these
temporary jars instead of using the path from java.io.tmpdir.

Change-Id: I5f9dedb5b342415380c83e61a72eb497371a8199
Reviewed-on: http://gerrit.cloudera.org:8080/4617
Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c03cfe51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c03cfe51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c03cfe51

Branch: refs/heads/master
Commit: c03cfe51fc6640a4b5d0587582123b56b7575888
Parents: 1af0aa4
Author: Bharath Vissapragada <bharathv@cloudera.com>
Authored: Mon Oct 3 14:34:59 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Fri Nov 4 00:07:18 2016 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog.cc                            |  7 +++++--
 .../apache/impala/catalog/CatalogServiceCatalog.java | 15 +++++++++------
 .../org/apache/impala/common/FileSystemUtil.java     | 13 +++++++++++++
 .../java/org/apache/impala/service/JniCatalog.java   |  6 ++++--
 .../impala/testutil/CatalogServiceTestCatalog.java   |  3 ++-
 tests/custom_cluster/test_permanent_udfs.py          |  9 +++++++++
 6 files changed, 42 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 44771c8..9e7af4e 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -30,6 +30,7 @@ using namespace impala;
 
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_string(principal);
+DECLARE_string(local_library_dir);
 
 DEFINE_bool(load_catalog_in_background, false,
     "If true, loads catalog metadata in the background. If false, metadata is loaded "
@@ -44,7 +45,8 @@ DECLARE_int32(non_impala_java_vlog);
 
 Catalog::Catalog() {
   JniMethodDescriptor methods[] = {
-    {"<init>", "(ZILjava/lang/String;IIZLjava/lang/String;)V", &catalog_ctor_},
+    {"<init>", "(ZILjava/lang/String;IIZLjava/lang/String;Ljava/lang/String;)V",
+        &catalog_ctor_},
     {"updateCatalog", "([B)[B", &update_metastore_id_},
     {"execDdl", "([B)[B", &exec_ddl_id_},
     {"resetMetadata", "([B)[B", &reset_metadata_id_},
@@ -74,10 +76,11 @@ Catalog::Catalog() {
   // and impala is kerberized.
   jboolean auth_to_local = FLAGS_load_auth_to_local_rules && !FLAGS_principal.empty();
   jstring principal = jni_env->NewStringUTF(FLAGS_principal.c_str());
+  jstring local_library_dir = jni_env->NewStringUTF(FLAGS_local_library_dir.c_str());
   jobject catalog = jni_env->NewObject(catalog_class_, catalog_ctor_,
       load_in_background, num_metadata_loading_threads, sentry_config,
       FlagToTLogLevel(FLAGS_v), FlagToTLogLevel(FLAGS_non_impala_java_vlog),
-      auth_to_local, principal);
+      auth_to_local, principal, local_library_dir);
   EXIT_IF_EXC(jni_env);
   ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, catalog, &catalog_));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 10f6e4c..f56f502 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -145,15 +145,15 @@ public class CatalogServiceCatalog extends Catalog {
   private final SentryProxy sentryProxy_;
 
   // Local temporary directory to copy UDF Jars.
-  private static final String LOCAL_LIBRARY_PATH = new String("file://" +
-      System.getProperty("java.io.tmpdir"));
+  private static String localLibraryPath_;
 
   /**
    * Initialize the CatalogServiceCatalog. If loadInBackground is true, table metadata
    * will be loaded in the background
    */
   public CatalogServiceCatalog(boolean loadInBackground, int numLoadingThreads,
-      SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal) {
+      SentryConfig sentryConfig, TUniqueId catalogServiceId, String kerberosPrincipal,
+      String localLibraryPath) {
     super(true);
     catalogServiceId_ = catalogServiceId;
     tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads);
@@ -173,6 +173,7 @@ public class CatalogServiceCatalog extends Catalog {
     } else {
       sentryProxy_ = null;
     }
+    localLibraryPath_ = new String("file://" + localLibraryPath);
   }
 
   /**
@@ -428,7 +429,7 @@ public class CatalogServiceCatalog extends Catalog {
   /**
    * Returns a list of Impala Functions, one per compatible "evaluate" method in the UDF
    * class referred to by the given Java function. This method copies the UDF Jar
-   * referenced by "function" to a temporary file in "LOCAL_LIBRARY_PATH" and loads it
+   * referenced by "function" to a temporary file in localLibraryPath_ and loads it
    * into the jvm. Then we scan all the methods in the class using reflection and extract
    * those methods and create corresponding Impala functions. Currently Impala supports
    * only "JAR" files for symbols and also a single Jar containing all the dependent
@@ -447,9 +448,9 @@ public class CatalogServiceCatalog extends Catalog {
     }
     String jarUri = function.getResourceUris().get(0).getUri();
     Class<?> udfClass = null;
+    Path localJarPath = null;
     try {
-      Path localJarPath = new Path(LOCAL_LIBRARY_PATH,
-          UUID.randomUUID().toString() + ".jar");
+      localJarPath = new Path(localLibraryPath_, UUID.randomUUID().toString() + ".jar");
       try {
         FileSystemUtil.copyToLocal(new Path(jarUri), localJarPath);
       } catch (IOException e) {
@@ -501,6 +502,8 @@ public class CatalogServiceCatalog extends Catalog {
           function.getFunctionName();
       LOG.error(errorMsg);
       throw new ImpalaRuntimeException(errorMsg, e);
+    } finally {
+      if (localJarPath != null) FileSystemUtil.deleteIfExists(localJarPath);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index d9fd6e8..c81e27f 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -377,6 +377,19 @@ public class FileSystemUtil {
   }
 
   /**
+   * Delete the file at 'path' if it exists.
+   */
+  public static void deleteIfExists(Path path) {
+    try {
+      FileSystem fs = path.getFileSystem(CONF);
+      if (!fs.exists(path)) return;
+      fs.delete(path);
+    } catch (IOException e) {
+      LOG.warn("Encountered an exception deleting file at path " + path.toString(), e);
+    }
+  }
+
+  /**
    * Returns true if the given path is a location which supports caching (e.g. HDFS).
    */
   public static boolean isPathCacheable(Path path) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index 7d0af54..b35877f 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -81,7 +81,8 @@ public class JniCatalog {
 
   public JniCatalog(boolean loadInBackground, int numMetadataLoadingThreads,
       String sentryServiceConfig, int impalaLogLevel, int otherLogLevel,
-      boolean allowAuthToLocal, String kerberosPrincipal) throws InternalException {
+      boolean allowAuthToLocal, String kerberosPrincipal, String localLibraryPath)
+      throws InternalException {
     BackendConfig.setAuthToLocal(allowAuthToLocal);
     Preconditions.checkArgument(numMetadataLoadingThreads > 0);
     // This trick saves having to pass a TLogLevel enum, which is an object and more
@@ -98,7 +99,8 @@ public class JniCatalog {
     LOG.info(JniUtil.getJavaVersion());
 
     catalog_ = new CatalogServiceCatalog(loadInBackground,
-        numMetadataLoadingThreads, sentryConfig, getServiceId(), kerberosPrincipal);
+        numMetadataLoadingThreads, sentryConfig, getServiceId(), kerberosPrincipal,
+        localLibraryPath);
     try {
       catalog_.reset();
     } catch (CatalogException e) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 92b127f..c364ba5 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -31,7 +31,8 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
 
   public CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads,
       SentryConfig sentryConfig, TUniqueId catalogServiceId) {
-    super(loadInBackground, numLoadingThreads, sentryConfig, catalogServiceId, null);
+    super(loadInBackground, numLoadingThreads, sentryConfig, catalogServiceId, null,
+        System.getProperty("java.io.tmpdir"));
 
     // Cache pools are typically loaded asynchronously, but as there is no fixed execution
     // order for tests, the cache pools are loaded synchronously before the tests are

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c03cfe51/tests/custom_cluster/test_permanent_udfs.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py
index c979cd1..0170b8e 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -15,10 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import glob
 import os
 import pytest
+import shutil
 import subprocess
 
+from tempfile import mkdtemp
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_uncompressed_text_dimension
@@ -33,6 +36,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
   HIVE_IMPALA_INTEGRATION_DB = 'hive_impala_integration_db'
   HIVE_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/hive-exec.jar';
   JAVA_UDF_JAR = os.getenv('DEFAULT_FS') + '/test-warehouse/impala-hive-udfs.jar';
+  LOCAL_LIBRARY_DIR = mkdtemp(dir="/tmp")
 
   @classmethod
   def get_workload(cls):
@@ -76,6 +80,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
     self.client.execute("DROP DATABASE IF EXISTS %s CASCADE" % self.JAVA_FN_TEST_DB)
     self.client.execute("DROP DATABASE IF EXISTS %s CASCADE"
        % self.HIVE_IMPALA_INTEGRATION_DB)
+    shutil.rmtree(self.LOCAL_LIBRARY_DIR, ignore_errors=True)
 
   def run_stmt_in_hive(self, stmt):
     """
@@ -176,6 +181,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
   @SkipIfS3.hive
   @SkipIfLocal.hive
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+     catalogd_args= "--local_library_dir=%s" % LOCAL_LIBRARY_DIR)
   def test_java_udfs_hive_integration(self):
     ''' This test checks the integration between Hive and Impala on
     CREATE FUNCTION and DROP FUNCTION statements for persistent Java UDFs.
@@ -225,6 +232,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
     self.client.execute("INVALIDATE METADATA")
     self.verify_function_count(
             "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0)
+    # Make sure we deleted all the temporary jars we copied to the local fs
+    assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
 
   @pytest.mark.execute_serially
   def test_java_udfs_from_impala(self):


Mime
View raw message