impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [1/4] incubator-impala git commit: IMPALA-5259: Add REFRESH FUNCTIONS <db> statement
Date Thu, 25 May 2017 15:48:38 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master b4343895d -> b27827744


IMPALA-5259: Add REFRESH FUNCTIONS <db> statement

Before this patch, Impala relied on INVALIDATE METADATA to load
externally added UDFs from HMS. The problem with this approach is that
INVALIDATE METADATA affects all databases and tables in the entire
cluster.

In this patch, we add a REFRESH FUNCTIONS <db> statement that reloads
the functions of a database from HMS. We return a list of updated and
removed db functions to the issuing Impalad in order to update its
local catalog cache.

Testing:
- Ran a private build which passed.

Change-Id: I3625c88bb51cca833f3293c224d3f0feb00e6e0b
Reviewed-on: http://gerrit.cloudera.org:8080/6878
Reviewed-by: Taras Bobrovytsky <tbobrovytsky@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24ff0f2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24ff0f2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24ff0f2f

Branch: refs/heads/master
Commit: 24ff0f2fc21f0111f4c4fa9cbab40105e1e7ee01
Parents: b434389
Author: Taras Bobrovytsky <tbobrovytsky@cloudera.com>
Authored: Mon May 8 13:18:10 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu May 25 03:30:03 2017 +0000

----------------------------------------------------------------------
 common/thrift/CatalogService.thrift             |   3 +
 fe/src/main/cup/sql-parser.cup                  |  10 +-
 .../impala/analysis/ResetMetadataStmt.java      |  61 +++++--
 .../impala/catalog/CatalogServiceCatalog.java   |  63 ++++++++
 .../main/java/org/apache/impala/catalog/Db.java |   6 +
 .../org/apache/impala/catalog/Function.java     |   9 ++
 .../apache/impala/catalog/ImpaladCatalog.java   |   3 +
 .../impala/service/CatalogOpExecutor.java       |  30 ++--
 .../org/apache/impala/service/Frontend.java     |  18 +--
 .../org/apache/impala/analysis/ParserTest.java  |   2 +
 .../org/apache/impala/analysis/ToSqlTest.java   |  21 +++
 tests/custom_cluster/test_permanent_udfs.py     | 160 ++++++++++++++++---
 12 files changed, 323 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 188f71a..95ac7e9 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -193,6 +193,9 @@ struct TResetMetadataRequest {
   // If set, refreshes the specified partition, otherwise
   // refreshes the whole table
   5: optional list<CatalogObjects.TPartitionKeyValue> partition_spec
+
+  // If set, refreshes functions in the specified database.
+  6: optional string db_name
 }
 
 // Response from TResetMetadataRequest

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index ebc7804..b88d7d0 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -661,13 +661,15 @@ overwrite_val ::=
 
 reset_metadata_stmt ::=
   KW_INVALIDATE KW_METADATA
-  {: RESULT = new ResetMetadataStmt(null, false, null); :}
+  {: RESULT = ResetMetadataStmt.createInvalidateStmt(null); :}
   | KW_INVALIDATE KW_METADATA table_name:table
-  {: RESULT = new ResetMetadataStmt(table, false, null); :}
+  {: RESULT = ResetMetadataStmt.createInvalidateStmt(table); :}
   | KW_REFRESH table_name:table
-  {: RESULT = new ResetMetadataStmt(table, true, null); :}
+  {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table, null); :}
   | KW_REFRESH table_name:table partition_spec:partition
-  {: RESULT = new ResetMetadataStmt(table, true, partition); :}
+  {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table, partition); :}
+  | KW_REFRESH KW_FUNCTIONS ident_or_default:db
+  {: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :}
   ;
 
 explain_stmt ::=

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index 6f6a3f1..ac7ca2e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -27,10 +27,16 @@ import org.apache.impala.thrift.TTableName;
 import com.google.common.base.Preconditions;
 
 /**
- * Representation of a REFRESH/INVALIDATE METADATA statement.
+ * Representation of the following statements:
+ * INVALIDATE METADATA
+ * INVALIDATE METADATA <table>
+ * REFRESH <table>
+ * REFRESH <table> PARTITION <partition>
+ * REFRESH FUNCTIONS <database>
  */
 public class ResetMetadataStmt extends StatementBase {
-  // Updated during analysis. Null if invalidating the entire catalog.
+  // Updated during analysis. Null if invalidating the entire catalog or refreshing
+  // database functions.
   private TableName tableName_;
 
   // true if it is a REFRESH statement.
@@ -39,16 +45,36 @@ public class ResetMetadataStmt extends StatementBase {
   // not null when refreshing a single partition
   private final PartitionSpec partitionSpec_;
 
-  public ResetMetadataStmt(TableName name, boolean isRefresh,
-      PartitionSpec partitionSpec) {
-    Preconditions.checkArgument(!isRefresh || name != null);
-    Preconditions.checkArgument(isRefresh || partitionSpec == null);
-    this.tableName_ = name;
+  // not null when refreshing functions in a database.
+  private final String database_;
+
+  private ResetMetadataStmt(TableName tableName, boolean isRefresh,
+      PartitionSpec partitionSpec, String db) {
+    Preconditions.checkArgument(!isRefresh || (tableName != null || db != null));
+    Preconditions.checkArgument(isRefresh || (partitionSpec == null && db == null));
+    Preconditions.checkArgument(db == null || (
+        tableName == null && isRefresh && partitionSpec == null));
+
+    this.database_ = db;
+    this.tableName_ = tableName;
     this.isRefresh_ = isRefresh;
     this.partitionSpec_ = partitionSpec;
     if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_);
   }
 
+  public static ResetMetadataStmt createInvalidateStmt(TableName tableName) {
+    return new ResetMetadataStmt(tableName, false, null, null);
+  }
+
+  public static ResetMetadataStmt createRefreshTableStmt(TableName tableName,
+      PartitionSpec partitionSpec) {
+    return new ResetMetadataStmt(tableName, true, partitionSpec, null);
+  }
+
+  public static ResetMetadataStmt createRefreshFunctionsStmt(String database) {
+    return new ResetMetadataStmt(null, true, null, database);
+  }
+
   public TableName getTableName() { return tableName_; }
 
   @Override
@@ -85,25 +111,28 @@ public class ResetMetadataStmt extends StatementBase {
   public String toSql() {
     StringBuilder result = new StringBuilder();
     if (isRefresh_) {
-      result.append("INVALIDATE METADATA");
-    } else {
       result.append("REFRESH");
+      if (database_ == null) {
+        result.append(" ").append(tableName_);
+        if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql());
+      } else {
+        result.append(" FUNCTIONS ").append(database_);
+      }
+    } else {
+      result.append("INVALIDATE METADATA");
+      if (tableName_ != null) result.append(" ").append(tableName_);
     }
-
-    if (tableName_ != null) result.append(" ").append(tableName_);
-    if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql());
     return result.toString();
   }
 
   public TResetMetadataRequest toThrift() {
-    TResetMetadataRequest  params = new TResetMetadataRequest();
+    TResetMetadataRequest params = new TResetMetadataRequest();
     params.setIs_refresh(isRefresh_);
     if (tableName_ != null) {
       params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl()));
     }
-    if (partitionSpec_ != null) {
-      params.setPartition_spec(partitionSpec_.toThrift());
-    }
+    if (partitionSpec_ != null) params.setPartition_spec(partitionSpec_.toThrift());
+    if (database_ != null) params.setDb_name(database_);
     return params;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 18ece9b..d2a0a82 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -604,6 +605,68 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Reloads function metadata for 'dbName' database. Populates the 'addedFuncs' list
+   * with functions that were added as a result of this operation. Populates the
+   * 'removedFuncs' list with functions that were removed.
+   */
+  public void refreshFunctions(MetaStoreClient msClient, String dbName,
+      List<TCatalogObject> addedFuncs, List<TCatalogObject> removedFuncs)
+      throws CatalogException {
+    // Create a temporary database that will contain all the functions from the HMS.
+    Db tmpDb;
+    try {
+      List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
+          Lists.newArrayList();
+      for (String javaFn : msClient.getHiveClient().getFunctions(dbName, "*")) {
+        javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
+      }
+      // Contains native functions in it's params map.
+      org.apache.hadoop.hive.metastore.api.Database msDb =
+          msClient.getHiveClient().getDatabase(dbName);
+      tmpDb = new Db(dbName, this, null);
+      // Load native UDFs into the temporary db.
+      loadFunctionsFromDbParams(tmpDb, msDb);
+      // Load Java UDFs from HMS into the temporary db.
+      loadJavaFunctions(tmpDb, javaFns);
+
+      Db db = dbCache_.get().get(dbName);
+      if (db == null) {
+        throw new DatabaseNotFoundException("Database does not exist: " + dbName);
+      }
+      // Load transient functions into the temporary db.
+      for (Function fn: db.getTransientFunctions()) tmpDb.addFunction(fn);
+
+      // Compute the removed functions and remove them from the db.
+      for (Map.Entry<String, List<Function>> e: db.getAllFunctions().entrySet())
{
+        for (Function fn: e.getValue()) {
+          if (tmpDb.getFunction(
+              fn, Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
+            fn.setCatalogVersion(incrementAndGetCatalogVersion());
+            removedFuncs.add(fn.toTCatalogObject());
+          }
+        }
+      }
+
+      // We will re-add all the functions to the db because it's possible that a
+      // function was dropped and a different function (for example, the binary is
+      // different) with the same name and signature was re-added in Hive.
+      db.removeAllFunctions();
+      for (Map.Entry<String, List<Function>> e: tmpDb.getAllFunctions().entrySet())
{
+        for (Function fn: e.getValue()) {
+          // We do not need to increment and acquire a new catalog version for this
+          // function here because this already happens when the functions are loaded
+          // into tmpDb.
+          db.addFunction(fn);
+          addedFuncs.add(fn.toTCatalogObject());
+        }
+      }
+
+    } catch (Exception e) {
+      throw new CatalogException("Error refreshing functions in " + dbName + ": ", e);
+    }
+  }
+
+  /**
    * Invalidates the database 'db'. This method can have potential race
    * conditions with external changes to the Hive metastore and hence any
    * conflicting changes to the objects can manifest in the form of exceptions

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index d8f5719..074ff92 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -365,6 +365,12 @@ public class Db implements CatalogObject {
     }
   }
 
+  public void removeAllFunctions() {
+    synchronized (functions_) {
+      functions_.clear();
+    }
+  }
+
   /**
    * Removes a Function with the matching signature string. Returns the removed Function
    * if a Function was removed as a result of this call, null otherwise.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/Function.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index f7f2632..2f0859f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -25,6 +25,7 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TAggregateFunction;
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TFunction;
@@ -310,6 +311,14 @@ public class Function implements CatalogObject {
   // Child classes must override this function.
   public String toSql(boolean ifNotExists) { return ""; }
 
+  public TCatalogObject toTCatalogObject () {
+    TCatalogObject result = new TCatalogObject();
+    result.setType(TCatalogObjectType.FUNCTION);
+    result.setFn(toThrift());
+    result.setCatalog_version(catalogVersion_);
+    return result;
+  }
+
   public TFunction toThrift() {
     TFunction fn = new TFunction();
     fn.setSignature(signatureString());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 521a844..45db6a2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -253,6 +253,9 @@ public class ImpaladCatalog extends Catalog {
         addTable(catalogObject.getTable(), catalogObject.getCatalog_version());
         break;
       case FUNCTION:
+        // Remove the function first, in case there is an existing function with the same
+        // name and signature.
+        removeFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
         addFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
         break;
       case DATA_SOURCE:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3f7365f..c0246e8 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -987,14 +987,6 @@ public class CatalogOpExecutor {
         resp.result.getUpdated_catalog_object_DEPRECATED().getCatalog_version());
   }
 
-  private TCatalogObject buildTCatalogFnObject(Function fn) {
-    TCatalogObject result = new TCatalogObject();
-    result.setType(TCatalogObjectType.FUNCTION);
-    result.setFn(fn.toThrift());
-    result.setCatalog_version(fn.getCatalogVersion());
-    return result;
-  }
-
  private void createFunction(TCreateFunctionParams params, TDdlExecResponse resp)
       throws ImpalaException {
     Function fn = Function.fromThrift(params.getFn());
@@ -1042,14 +1034,14 @@ public class CatalogOpExecutor {
                   addedFn.signatureString()));
             }
             Preconditions.checkState(catalog_.addFunction(addedFn));
-            addedFunctions.add(buildTCatalogFnObject(addedFn));
+            addedFunctions.add(addedFn.toTCatalogObject());
           }
         }
       } else {
         if (catalog_.addFunction(fn)) {
           // Flush DB changes to metastore
           applyAlterDatabase(catalog_.getDb(fn.dbName()));
-          addedFunctions.add(buildTCatalogFnObject(fn));
+          addedFunctions.add(fn.toTCatalogObject());
         }
       }
 
@@ -1514,7 +1506,7 @@ public class CatalogOpExecutor {
             continue;
           }
           Preconditions.checkNotNull(catalog_.removeFunction(fn));
-          removedFunctions.add(buildTCatalogFnObject(fn));
+          removedFunctions.add(fn.toTCatalogObject());
         }
       } else {
         ArrayList<Type> argTypes = Lists.newArrayList();
@@ -1531,7 +1523,7 @@ public class CatalogOpExecutor {
         } else {
           // Flush DB changes to metastore
           applyAlterDatabase(catalog_.getDb(fn.dbName()));
-          removedFunctions.add(buildTCatalogFnObject(fn));
+          removedFunctions.add(fn.toTCatalogObject());
         }
       }
 
@@ -3099,7 +3091,19 @@ public class CatalogOpExecutor {
     resp.setResult(new TCatalogUpdateResult());
     resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());
 
-    if (req.isSetTable_name()) {
+    if (req.isSetDb_name()) {
+      // This is a "refresh functions" operation.
+      synchronized (metastoreDdlLock_) {
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          List<TCatalogObject> addedFuncs = Lists.newArrayList();
+          List<TCatalogObject> removedFuncs = Lists.newArrayList();
+          catalog_.refreshFunctions(msClient, req.getDb_name(), addedFuncs, removedFuncs);
+          resp.result.setUpdated_catalog_objects(addedFuncs);
+          resp.result.setRemoved_catalog_objects(removedFuncs);
+          resp.result.setVersion(catalog_.getCatalogVersion());
+        }
+      }
+    } else if (req.isSetTable_name()) {
       // Results of an invalidate operation, indicating whether the table was removed
       // from the Metastore, and whether a new database was added to Impala as a result
       // of the invalidate operation. Always false for refresh.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 13faba5..0345260 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -222,20 +222,20 @@ public class Frontend {
       TUpdateCatalogCacheRequest req) throws CatalogException {
     ImpaladCatalog catalog = impaladCatalog_;
 
+    if (req.is_delta) return catalog.updateCatalog(req);
+
     // If this is not a delta, this update should replace the current
     // Catalog contents so create a new catalog and populate it.
-    if (!req.is_delta) catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
+    catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
 
     TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
 
-    if (!req.is_delta) {
-      // This was not a delta update. Now that the catalog has been updated,
-      // replace the references to impaladCatalog_/authzChecker_ ensure
-      // clients continue don't see the catalog disappear.
-      impaladCatalog_ = catalog;
-      authzChecker_.set(new AuthorizationChecker(authzConfig_,
-          impaladCatalog_.getAuthPolicy()));
-    }
+    // Now that the catalog has been updated, replace the references to
+    // impaladCatalog_/authzChecker_. This ensures that clients don't see
+    // the catalog disappear.
+    impaladCatalog_ = catalog;
+    authzChecker_.set(new AuthorizationChecker(authzConfig_,
+        impaladCatalog_.getAuthPolicy()));
     return response;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 3650ace..bda9125 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3076,6 +3076,7 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("refresh Foo partition (col=2)");
     ParsesOk("refresh Foo.S partition (col=2)");
     ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)");
+    ParsesOk("refresh functions Foo");
 
     ParserError("invalidate");
     ParserError("invalidate metadata Foo.S.S");
@@ -3085,6 +3086,7 @@ public class ParserTest extends FrontendTestBase {
     ParserError("refresh");
     ParserError("refresh Foo.S partition (col1 = 2, col2)");
     ParserError("refresh Foo.S partition ()");
+    ParserError("refresh functions Foo.S");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 98e0e3a..3094df4 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -1308,4 +1308,25 @@ public class ToSqlTest extends FrontendTestBase {
         "WITH t AS (SELECT * FROM functional.alltypes TABLESAMPLE SYSTEM(5)) " +
         "SELECT * FROM t");
   }
+
+  /**
+   * Tests invalidate statements are output correctly.
+   */
+  @Test
+  public void testInvalidate() {
+    testToSql("INVALIDATE METADATA", "INVALIDATE METADATA");
+    testToSql("INVALIDATE METADATA functional.alltypes",
+        "INVALIDATE METADATA functional.alltypes");
+  }
+
+  /**
+   * Tests refresh statements are output correctly.
+   */
+  @Test
+  public void testRefresh() {
+    testToSql("REFRESH functional.alltypes", "REFRESH functional.alltypes");
+    testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)",
+        "REFRESH functional.alltypes PARTITION (year=2009, month=1)");
+    testToSql("REFRESH FUNCTIONS functional", "REFRESH FUNCTIONS functional");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24ff0f2f/tests/custom_cluster/test_permanent_udfs.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py
index ae1c19f..3823c55 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -18,6 +18,7 @@
 import glob
 import os
 import pytest
+import re
 import shutil
 import subprocess
 
@@ -28,8 +29,9 @@ from tests.common.test_dimensions import create_uncompressed_text_dimension
 from tests.util.filesystem_utils import get_fs_path
 
 class TestUdfPersistence(CustomClusterTestSuite):
-  """ Tests the behavior of UDFs and UDAs between catalog restarts. With IMPALA-1748, these
-  functions are persisted to the metastore and are loaded again during catalog startup"""
+  """ Tests the behavior of UDFs and UDAs between catalog restarts. With IMPALA-1748,
+  these functions are persisted to the metastore and are loaded again during catalog
+  startup"""
 
   DATABASE = 'udf_permanent_test'
   JAVA_FN_TEST_DB = 'java_permanent_test'
@@ -183,7 +185,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
   @SkipIfLocal.hive
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-     catalogd_args= "--local_library_dir=%s" % LOCAL_LIBRARY_DIR)
+     catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
   def test_java_udfs_hive_integration(self):
     ''' This test checks the integration between Hive and Impala on
     CREATE FUNCTION and DROP FUNCTION statements for persistent Java UDFs.
@@ -215,24 +217,140 @@ class TestUdfPersistence(CustomClusterTestSuite):
       assert "does not exist" in hive_stdout
 
     # Create the same set of functions from Hive and make sure they are visible
-    # in Impala.
-    for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
-      self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format(
-          db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn,
-          location=self.HIVE_UDF_JAR, symbol=fn_symbol))
-    self.client.execute("INVALIDATE METADATA")
-    for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
-      result = self.client.execute("SHOW FUNCTIONS IN %s" %
-          self.HIVE_IMPALA_INTEGRATION_DB)
-      assert result is not None and len(result.data) > 0 and\
-          fn in str(result.data)
-      self.__verify_udf_in_impala(fn)
-      # Drop the function in Hive and make sure it reflects in Impala.
-      self.run_stmt_in_hive(self.DROP_JAVA_UDF_TEMPLATE.format(
-          db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
-    self.client.execute("INVALIDATE METADATA")
-    self.verify_function_count(
-            "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0)
+    # in Impala. There are two ways to make functions visible in Impala: invalidate
+    # metadata and refresh functions <db>.
+    REFRESH_COMMANDS = ["INVALIDATE METADATA",
+        "REFRESH FUNCTIONS {0}".format(self.HIVE_IMPALA_INTEGRATION_DB)]
+    for refresh_command in REFRESH_COMMANDS:
+      for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
+        self.run_stmt_in_hive(self.CREATE_HIVE_UDF_TEMPLATE.format(
+            db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn,
+            location=self.HIVE_UDF_JAR, symbol=fn_symbol))
+      self.client.execute(refresh_command)
+      for (fn, fn_symbol) in self.SAMPLE_JAVA_UDFS:
+        result = self.client.execute("SHOW FUNCTIONS IN {0}".format(
+            self.HIVE_IMPALA_INTEGRATION_DB))
+        assert result is not None and len(result.data) > 0 and\
+            fn in str(result.data)
+        self.__verify_udf_in_impala(fn)
+        # Drop the function in Hive and make sure it reflects in Impala.
+        self.run_stmt_in_hive(self.DROP_JAVA_UDF_TEMPLATE.format(
+            db=self.HIVE_IMPALA_INTEGRATION_DB, function=fn))
+      self.client.execute(refresh_command)
+      self.verify_function_count(
+          "SHOW FUNCTIONS in {0}".format(self.HIVE_IMPALA_INTEGRATION_DB), 0)
+      # Make sure we deleted all the temporary jars we copied to the local fs
+      assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
+
+  @SkipIfIsilon.hive
+  @SkipIfS3.hive
+  @SkipIfLocal.hive
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+     catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
+  def test_refresh_native(self):
+    ''' This test checks that a native function is visible in Impala after a
+    REFRESH FUNCTIONS command. We will add the native function through Hive
+    by setting DBPROPERTIES of a database.'''
+    # First we create the function in Impala.
+    create_func_impala = ("create function {database}.identity_tmp(bigint) "
+        "returns bigint location '{location}' symbol='Identity'")
+    self.client.execute(create_func_impala.format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB,
+        location=get_fs_path('/test-warehouse/libTestUdfs.so')))
+
+    # Impala puts the native function into a database property table. We extract the key
+    # value pair that represents the function from the table.
+    describe_db_hive = "DESCRIBE DATABASE EXTENDED {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB)
+    result = self.run_stmt_in_hive(describe_db_hive)
+    regex = r"{(.*?)=(.*?)}"
+    match = re.search(regex, result)
+    func_name = match.group(1)
+    func_contents = match.group(2)
+
+    # Recreate the database, this deletes the function.
+    self.client.execute("DROP DATABASE {database} CASCADE".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    self.client.execute("CREATE DATABASE {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result is not None and len(result.data) == 0
+
+    # Place the function into the recreated database by modifying it's properties.
+    alter_db_hive = "ALTER DATABASE {database} SET DBPROPERTIES ('{fn_name}'='{fn_val}')"
+    self.run_stmt_in_hive(alter_db_hive.format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB,
+        fn_name=func_name,
+        fn_val=func_contents))
+    result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result is not None and len(result.data) == 0
+
+    # The function should be visible in Impala after a REFRESH FUNCTIONS.
+    self.client.execute("REFRESH FUNCTIONS {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result is not None and len(result.data) > 0 and\
+        "identity_tmp" in str(result.data)
+
+    # Verify that the function returns a correct result.
+    result = self.client.execute("SELECT {database}.identity_tmp(10)".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result.data[0] == "10"
+    # Make sure we deleted all the temporary jars we copied to the local fs
+    assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
+
+  @SkipIfIsilon.hive
+  @SkipIfS3.hive
+  @SkipIfLocal.hive
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+     catalogd_args= "--local_library_dir={0}".format(LOCAL_LIBRARY_DIR))
+  def test_refresh_replace(self):
+    ''' This test checks that if we drop a function and then create a
+    different function with the same name in Hive, the new function will
+    be visible in Impala after REFRESH FUNCTIONS.'''
+    # Create an original function.
+    create_orig_func_hive = ("create function {database}.test_func as "
+        "'org.apache.hadoop.hive.ql.udf.UDFHex' using jar '{jar}'")
+    self.run_stmt_in_hive(create_orig_func_hive.format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB, jar=self.JAVA_UDF_JAR))
+    result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result is not None and len(result.data) == 0
+    # Verify the function becomes visible in Impala after REFRESH FUNCTIONS.
+    self.client.execute("REFRESH FUNCTIONS {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert (result is not None and len(result.data) == 3 and
+        "test_func" in str(result.data))
+    result = self.client.execute("SELECT {database}.test_func(123)".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result.data[0] == "7B"
+
+    # Drop the original function and create a different function with the same name as
+    # the original, but a different JAR.
+    drop_orig_func_hive = "DROP FUNCTION {database}.test_func"
+    self.run_stmt_in_hive(drop_orig_func_hive.format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    create_replacement_func_hive = ("create function {database}.test_func as "
+        "'org.apache.hadoop.hive.ql.udf.UDFBin' using jar '{jar}'")
+    self.run_stmt_in_hive(create_replacement_func_hive.format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB, jar=self.JAVA_UDF_JAR))
+    self.client.execute("REFRESH FUNCTIONS {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    result = self.client.execute("SHOW FUNCTIONS IN {database}".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert (result is not None and len(result.data) == 1 and
+        "test_func" in str(result.data))
+    # Verify that the function has actually been updated.
+    result = self.client.execute("SELECT {database}.test_func(123)".format(
+        database=self.HIVE_IMPALA_INTEGRATION_DB))
+    assert result.data[0] == "1111011"
     # Make sure we deleted all the temporary jars we copied to the local fs
     assert len(glob.glob(self.LOCAL_LIBRARY_DIR + "/*.jar")) == 0
 


Mime
View raw message