Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DD52F200D52 for ; Sat, 18 Nov 2017 07:20:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D29BF160C0A; Sat, 18 Nov 2017 06:20:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 853EE160BFB for ; Sat, 18 Nov 2017 07:20:25 +0100 (CET) Received: (qmail 76415 invoked by uid 500); 18 Nov 2017 06:20:24 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 76314 invoked by uid 99); 18 Nov 2017 06:20:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 18 Nov 2017 06:20:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DCF1EDFF17; Sat, 18 Nov 2017 06:20:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: daijy@apache.org To: commits@hive.apache.org Message-Id: <27d613f8828e419c89fc74ef8e5fae82@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-18056: CachedStore: Have a whitelist/blacklist config to allow selective caching of tables/partitions and allow read while prewarming (Vaibhav Gumashta, Daniel Dai, reviewed by Thejas Nair, Sergey Shelukhin) Date: Sat, 18 Nov 2017 06:20:23 +0000 (UTC) archived-at: Sat, 18 Nov 2017 06:20:28 -0000 Repository: hive Updated Branches: refs/heads/master 66c013305 -> f1698b62c HIVE-18056: CachedStore: Have a whitelist/blacklist config to allow selective caching of tables/partitions and allow read while prewarming (Vaibhav Gumashta, Daniel Dai, reviewed by Thejas Nair, Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f1698b62 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f1698b62 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f1698b62 Branch: refs/heads/master Commit: f1698b62c9ed9286caad53de9c1cf669e2a38864 Parents: 66c0133 Author: Daniel Dai Authored: Fri Nov 17 22:19:53 2017 -0800 Committer: Daniel Dai Committed: Fri Nov 17 22:19:53 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 + .../hive/metastore/cache/CachedStore.java | 802 ++++++++++++------- .../hive/metastore/cache/SharedCache.java | 22 + .../hive/metastore/conf/MetastoreConf.java | 11 + .../hive/metastore/utils/MetaStoreUtils.java | 11 +- 5 files changed, 555 insertions(+), 302 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f1698b62/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 09d9ce3..1a1d50c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -900,6 +900,17 @@ public class HiveConf extends Configuration { "hive.metastore.cached.rawstore.cache.update.frequency", "60", new TimeValidator( TimeUnit.SECONDS), "The time after which metastore cache is updated from metastore DB."), + METASTORE_CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST( + "hive.metastore.cached.rawstore.cached.object.whitelist", ".*", "Comma separated list of regular expressions \n " + + "to select the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.blacklist. \n" + + "Example: .*, db1.*, db2\\.tbl.*. The last item can potentially override patterns specified before."), + METASTORE_CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST( + "hive.metastore.cached.rawstore.cached.object.blacklist", "", "Comma separated list of regular expressions \n " + + "to filter out the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.whitelist. \n" + + "Example: db2.*, db3\\.tbl1, db3\\..*. The last item can potentially override patterns specified before. \n" + + "The blacklist also overrides the whitelist."), METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + http://git-wip-us.apache.org/repos/asf/hive/blob/f1698b62/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index c61f27b..2eb967b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -31,6 +33,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -124,6 +128,8 @@ public class CachedStore implements RawStore, Configurable { private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( true); private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); + private static List whitelistPatterns = null; + private static List blacklistPatterns = null; private RawStore rawStore = null; private Configuration conf; private PartitionExpressionProxy expressionProxy = null; @@ -240,6 +246,7 @@ public class CachedStore implements RawStore, Configurable { if (expressionProxy == null || conf != oldConf) { expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); } + initBlackListWhiteList(conf); } @VisibleForTesting @@ -247,29 +254,46 @@ public class CachedStore implements RawStore, Configurable { // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy Deadline.registerIfNot(1000000); List dbNames = rawStore.getAllDatabases(); + LOG.info("Number of databases to prewarm: " + dbNames.size()); SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); - for (String dbName : dbNames) { + for (int i = 0; i < dbNames.size(); i++) { + String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); + LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); Database db = rawStore.getDatabase(dbName); - sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(dbName), db); + sharedCache.addDatabaseToCache(dbName, db); List tblNames = rawStore.getAllTables(dbName); - for (String tblName : tblNames) { - Table table = rawStore.getTable(dbName, tblName); - sharedCache.addTableToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), table); - Deadline.startTimer("getPartitions"); - List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - for (Partition partition : partitions) { - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partition); + LOG.debug("Tables in database: {} : {}", dbName, tblNames); + for (int j = 0; j < tblNames.size(); j++) { + String tblName = StringUtils.normalizeIdentifier(tblNames.get(j)); + if (!shouldCacheTable(dbName, tblName)) { + LOG.info("Not caching database: {}'s table: {}", dbName, tblName); + continue; } - // Cache partition column stats - Deadline.startTimer("getColStatsForTablePartitions"); - Map> colStatsPerPartition = - rawStore.getColStatsForTablePartitions(dbName, tblName); - Deadline.stopTimer(); - if (colStatsPerPartition != null) { - sharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); + LOG.info("Caching database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, j, tblNames.size()); + Table table = null; + table = rawStore.getTable(dbName, tblName); + // It is possible the table is deleted during fetching tables of the database, + // in that case, continue with the next table + if (table == null) { + continue; + } + sharedCache.addTableToCache(dbName, tblName, table); + if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { + Deadline.startTimer("getPartitions"); + List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + for (Partition partition : partitions) { + sharedCache.addPartitionToCache(dbName, tblName, partition); + } + // Cache partition column stats + Deadline.startTimer("getColStatsForTablePartitions"); + Map> colStatsPerPartition = + rawStore.getColStatsForTablePartitions(dbName, tblName); + Deadline.stopTimer(); + if (colStatsPerPartition != null) { + sharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition); + } } // Cache table column stats List colNames = MetaStoreUtils.getColumnNamesForTable(table); @@ -278,16 +302,35 @@ public class CachedStore implements RawStore, Configurable { rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - sharedCache.addTableColStatsToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); } } } + // Notify all blocked threads that prewarm is complete now + sharedCacheWrapper.notifyAllBlocked(); + } + + private static void initBlackListWhiteList(Configuration conf) { + if (whitelistPatterns == null || blacklistPatterns == null) { + whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)); + blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); + // The last specified blacklist pattern gets precedence + Collections.reverse(blacklistPatterns); + } } @VisibleForTesting synchronized static void startCacheUpdateService(Configuration conf) { if (cacheUpdateMaster == null) { + initBlackListWhiteList(conf); + if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) { + cacheRefreshPeriod = + MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, + TimeUnit.MILLISECONDS); + } + LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -297,12 +340,6 @@ public class CachedStore implements RawStore, Configurable { return t; } }); - if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) { - cacheRefreshPeriod = - MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, - TimeUnit.MILLISECONDS); - } - LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms"); cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod, TimeUnit.MILLISECONDS); } @@ -354,9 +391,12 @@ public class CachedStore implements RawStore, Configurable { if (isFirstRun) { while (isFirstRun) { try { + long startTime = System.nanoTime(); LOG.info("Prewarming CachedStore"); prewarm(rawStore); LOG.info("CachedStore initialized"); + long endTime = System.nanoTime(); + LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); } catch (Exception e) { LOG.error("Prewarm failure", e); sharedCacheWrapper.updateInitState(e, false); @@ -384,6 +424,9 @@ public class CachedStore implements RawStore, Configurable { updateTables(rawStore, dbName); List tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; + } // Update the partitions for a table in cache updateTablePartitions(rawStore, dbName, tblName); // Update the table column stats for a table in cache @@ -433,6 +476,9 @@ public class CachedStore implements RawStore, Configurable { try { List tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; + } Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName)); @@ -591,11 +637,13 @@ public class CachedStore implements RawStore, Configurable { @Override public Database getDatabase(String dbName) throws NoSuchObjectException { SharedCache sharedCache; + + if (!sharedCacheWrapper.isInitialized()) { + return rawStore.getDatabase(dbName); + } + try { sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getDatabase(dbName); - } } catch (MetaException e) { throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx? } @@ -611,7 +659,7 @@ public class CachedStore implements RawStore, Configurable { boolean succ = rawStore.dropDatabase(dbname); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -630,7 +678,7 @@ public class CachedStore implements RawStore, Configurable { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -645,10 +693,10 @@ public class CachedStore implements RawStore, Configurable { @Override public List getDatabases(String pattern) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if (!sharedCacheWrapper.isInitialized()) { return rawStore.getDatabases(pattern); } + SharedCache sharedCache = sharedCacheWrapper.get(); List results = new ArrayList<>(); for (String dbName : sharedCache.listCachedDatabases()) { dbName = StringUtils.normalizeIdentifier(dbName); @@ -661,10 +709,10 @@ public class CachedStore implements RawStore, Configurable { @Override public List getAllDatabases() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if (!sharedCacheWrapper.isInitialized()) { return rawStore.getAllDatabases(); } + SharedCache sharedCache = sharedCacheWrapper.get(); return sharedCache.listCachedDatabases(); } @@ -704,34 +752,42 @@ public class CachedStore implements RawStore, Configurable { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - validateTableType(tbl); + String dbName = StringUtils.normalizeIdentifier(tbl.getDbName()); + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); + if (!shouldCacheTable(dbName, tblName)) { + return; + } SharedCache sharedCache = sharedCacheWrapper.get(); if (sharedCache == null) return; + validateTableType(tbl); try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), - StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + sharedCache.addTableToCache(dbName, tblName, tbl); } finally { tableCacheLock.readLock().unlock(); } } @Override - public boolean dropTable(String dbName, String tableName) throws MetaException, + public boolean dropTable(String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropTable(dbName, tableName); + boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; // Remove table try { // Wait if background table cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + sharedCache.removeTableFromCache(dbName, tblName); } finally { tableCacheLock.readLock().unlock(); } @@ -740,8 +796,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background table col stats cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + sharedCache.removeTableColStatsFromCache(dbName, tblName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -750,13 +805,14 @@ public class CachedStore implements RawStore, Configurable { } @Override - public Table getTable(String dbName, String tableName) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTable(dbName, tableName); + public Table getTable(String dbName, String tblName) throws MetaException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.getTable(dbName, tblName); } - Table tbl = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + SharedCache sharedCache = sharedCacheWrapper.get(); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); if (tbl != null) { tbl.unsetPrivileges(); tbl.setRewriteEnabled(tbl.isRewriteEnabled()); @@ -768,14 +824,18 @@ public class CachedStore implements RawStore, Configurable { public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { + String dbName = StringUtils.normalizeIdentifier(part.getDbName()); + String tblName = StringUtils.normalizeIdentifier(part.getTableName()); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(part.getDbName()), - StringUtils.normalizeIdentifier(part.getTableName()), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } finally { partitionCacheLock.readLock().unlock(); } @@ -788,15 +848,19 @@ public class CachedStore implements RawStore, Configurable { throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); for (Partition part : parts) { - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); @@ -810,8 +874,13 @@ public class CachedStore implements RawStore, Configurable { boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -819,8 +888,7 @@ public class CachedStore implements RawStore, Configurable { PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); while (iterator.hasNext()) { Partition part = iterator.next(); - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); @@ -830,15 +898,17 @@ public class CachedStore implements RawStore, Configurable { } @Override - public Partition getPartition(String dbName, String tableName, List part_vals) + public Partition getPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartition(dbName, tableName, part_vals); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.getPartition(dbName, tblName, part_vals); } + SharedCache sharedCache = sharedCacheWrapper.get(); Partition part = - sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part != null) { part.unsetPrivileges(); } else { @@ -848,30 +918,35 @@ public class CachedStore implements RawStore, Configurable { } @Override - public boolean doesPartitionExist(String dbName, String tableName, + public boolean doesPartitionExist(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.doesPartitionExist(dbName, tableName, part_vals); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.doesPartitionExist(dbName, tblName, part_vals); } - return sharedCache.existPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + SharedCache sharedCache = sharedCacheWrapper.get(); + return sharedCache.existPartitionFromCache(dbName, tblName, part_vals); } @Override - public boolean dropPartition(String dbName, String tableName, List part_vals) + public boolean dropPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); + boolean succ = rawStore.dropPartition(dbName, tblName, part_vals); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; // Remove partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.removePartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.removePartitionFromCache(dbName, tblName, part_vals); } finally { partitionCacheLock.readLock().unlock(); } @@ -880,8 +955,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -890,14 +964,15 @@ public class CachedStore implements RawStore, Configurable { } @Override - public List getPartitions(String dbName, String tableName, int max) + public List getPartitions(String dbName, String tblName, int max) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitions(dbName, tableName, max); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitions(dbName, tblName, max); } - List parts = sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), max); + SharedCache sharedCache = sharedCacheWrapper.get(); + List parts = sharedCache.listCachedPartitions(dbName, tblName, max); if (parts != null) { for (Partition part : parts) { part.unsetPrivileges(); @@ -910,39 +985,77 @@ public class CachedStore implements RawStore, Configurable { public void alterTable(String dbName, String tblName, Table newTable) throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); - validateTableType(newTable); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName()); + if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) { + return; + } SharedCache sharedCache = sharedCacheWrapper.get(); if (sharedCache == null) return; - // Update table cache - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update partition cache (key might have changed since table name is a - // component of key) - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - partitionCacheLock.readLock().unlock(); + + if (shouldCacheTable(dbName, newTblName)) { + validateTableType(newTable); + // Update table cache + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), newTable); + } finally { + tableCacheLock.readLock().unlock(); + } + // Update partition cache (key might have changed since table name is a + // component of key) + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), newTable); + } finally { + partitionCacheLock.readLock().unlock(); + } + } else { + // Remove the table and its cached partitions, stats etc, + // since it does not pass the whitelist/blacklist filter. + // Remove table + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + sharedCache.removeTableFromCache(dbName, tblName); + } finally { + tableCacheLock.readLock().unlock(); + } + // Remove partitions + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + sharedCache.removePartitionsFromCache(dbName, tblName); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + sharedCache.removePartitionColStatsFromCache(dbName, tblName); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } } } @Override - public List getTables(String dbName, String pattern) - throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + public List getTables(String dbName, String pattern) throws MetaException { + if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTables(dbName, pattern); } + SharedCache sharedCache = sharedCacheWrapper.get(); List tableNames = new ArrayList<>(); for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { if (CacheUtils.matches(table.getTableName(), pattern)) { @@ -953,12 +1066,12 @@ public class CachedStore implements RawStore, Configurable { } @Override - public List getTables(String dbName, String pattern, - TableType tableType) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTables(dbName, pattern); + public List getTables(String dbName, String pattern, TableType tableType) + throws MetaException { + if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { + return rawStore.getTables(dbName, pattern, tableType); } + SharedCache sharedCache = sharedCacheWrapper.get(); List tableNames = new ArrayList<>(); for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { if (CacheUtils.matches(table.getTableName(), pattern) && @@ -970,37 +1083,51 @@ public class CachedStore implements RawStore, Configurable { } @Override - public List getTableMeta(String dbNames, String tableNames, - List tableTypes) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + public List getTableMeta(String dbNames, String tableNames, List tableTypes) + throws MetaException { + // TODO Check if all required tables are allowed, if so, get it from cache + if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTableMeta(dbNames, tableNames, tableTypes); } + SharedCache sharedCache = sharedCacheWrapper.get(); return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames), StringUtils.normalizeIdentifier(tableNames), tableTypes); } @Override - public List getTableObjectsByName(String dbName, - List tblNames) throws MetaException, UnknownDBException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + public List
getTableObjectsByName(String dbName, List tblNames) + throws MetaException, UnknownDBException { + dbName = StringUtils.normalizeIdentifier(dbName); + boolean missSomeInCache = false; + for (String tblName : tblNames) { + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + missSomeInCache = true; + break; + } + } + if (!sharedCacheWrapper.isInitialized() || missSomeInCache) { return rawStore.getTableObjectsByName(dbName, tblNames); } + SharedCache sharedCache = sharedCacheWrapper.get(); List
tables = new ArrayList<>(); for (String tblName : tblNames) { - tables.add(sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName))); + tblName = StringUtils.normalizeIdentifier(tblName); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + tbl = rawStore.getTable(dbName, tblName); + } + tables.add(tbl); } return tables; } @Override public List getAllTables(String dbName) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getAllTables(dbName); } + SharedCache sharedCache = sharedCacheWrapper.get(); return getAllTablesInternal(dbName, sharedCache); } @@ -1013,12 +1140,12 @@ public class CachedStore implements RawStore, Configurable { } @Override - public List listTableNamesByFilter(String dbName, String filter, - short max_tables) throws MetaException, UnknownDBException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + public List listTableNamesByFilter(String dbName, String filter, short max_tables) + throws MetaException, UnknownDBException { + if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.listTableNamesByFilter(dbName, filter, max_tables); } + SharedCache sharedCache = sharedCacheWrapper.get(); List tableNames = new ArrayList<>(); int count = 0; for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { @@ -1034,16 +1161,16 @@ public class CachedStore implements RawStore, Configurable { @Override public List listPartitionNames(String dbName, String tblName, short max_parts) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.listPartitionNames(dbName, tblName, max_parts); } + SharedCache sharedCache = sharedCacheWrapper.get(); List partitionNames = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), max_parts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) { if (max_parts == -1 || count < max_parts) { partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); } @@ -1052,32 +1179,36 @@ public class CachedStore implements RawStore, Configurable { } @Override - public PartitionValuesResponse listPartitionValues(String db_name, String tbl_name, List cols, - boolean applyDistinct, String filter, boolean ascending, - List order, long maxParts) throws MetaException { + public PartitionValuesResponse listPartitionValues(String db_name, String tbl_name, + List cols, boolean applyDistinct, String filter, boolean ascending, + List order, long maxParts) throws MetaException { throw new UnsupportedOperationException(); } @Override - public List listPartitionNamesByFilter(String db_name, - String tbl_name, String filter, short max_parts) throws MetaException { + public List listPartitionNamesByFilter(String dbName, + String tblName, String filter, short max_parts) throws MetaException { // TODO Translate filter -> expr - return null; + return rawStore.listPartitionNamesByFilter(dbName, tblName, filter, max_parts); } @Override public void alterPartition(String dbName, String tblName, List partVals, Partition newPart) throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); - // Update partition cache + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return; + } SharedCache sharedCache = sharedCacheWrapper.get(); if (sharedCache == null) return; + // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } finally { partitionCacheLock.readLock().unlock(); } @@ -1086,8 +1217,7 @@ public class CachedStore implements RawStore, Configurable { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.alterPartitionInColStatsCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -1097,9 +1227,14 @@ public class CachedStore implements RawStore, Configurable { public void alterPartitions(String dbName, String tblName, List> partValsList, List newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); - // Update partition cache + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return; + } SharedCache sharedCache = sharedCacheWrapper.get(); if (sharedCache == null) return; + // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -1107,8 +1242,7 @@ public class CachedStore implements RawStore, Configurable { for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } } finally { partitionCacheLock.readLock().unlock(); @@ -1121,8 +1255,7 @@ public class CachedStore implements RawStore, Configurable { for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - sharedCache.alterPartitionInColStatsCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1185,27 +1318,25 @@ public class CachedStore implements RawStore, Configurable { public List getPartitionsByFilter(String dbName, String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; + return rawStore.getPartitionsByFilter(dbName, tblName, filter, maxParts); } @Override public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, - String defaultPartitionName, short maxParts, List result) - throws TException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitionsByExpr( - dbName, tblName, expr, defaultPartitionName, maxParts, result); + String defaultPartitionName, short maxParts, List result) throws TException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, + result); } + SharedCache sharedCache = sharedCacheWrapper.get(); List partNames = new LinkedList<>(); - Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartitionName, maxParts, partNames, sharedCache); + Table table = sharedCache.getTableFromCache(dbName, tblName); + boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr, + defaultPartitionName, maxParts, partNames, sharedCache); for (String partName : partNames) { - Partition part = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); part.unsetPrivileges(); result.add(part); } @@ -1213,31 +1344,25 @@ public class CachedStore implements RawStore, Configurable { } @Override - public int getNumPartitionsByFilter(String dbName, String tblName, - String filter) throws MetaException, NoSuchObjectException { - // TODO filter -> expr - // SharedCache sharedCache = sharedCacheWrapper.get(); - // if (sharedCache == null) { - return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); - // } - // Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - // StringUtils.normalizeIdentifier(tblName)); - // return 0; + public int getNumPartitionsByFilter(String dbName, String tblName, String filter) + throws MetaException, NoSuchObjectException { + return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); } @Override public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); } + SharedCache sharedCache = sharedCacheWrapper.get(); String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); List partNames = new LinkedList<>(); - Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache); + Table table = sharedCache.getTableFromCache(dbName, tblName); + getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, + sharedCache); return partNames.size(); } @@ -1254,14 +1379,15 @@ public class CachedStore implements RawStore, Configurable { @Override public List getPartitionsByNames(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.getPartitionsByNames(dbName, tblName, partNames); } + SharedCache sharedCache = sharedCacheWrapper.get(); List partitions = new ArrayList<>(); for (String partName : partNames) { - Partition part = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); if (part!=null) { partitions.add(part); } @@ -1429,15 +1555,15 @@ public class CachedStore implements RawStore, Configurable { public Partition getPartitionWithAuth(String dbName, String tblName, List partVals, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); } - Partition p = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals); + SharedCache sharedCache = sharedCacheWrapper.get(); + Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals); if (p!=null) { - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); @@ -1450,16 +1576,16 @@ public class CachedStore implements RawStore, Configurable { public List getPartitionsWithAuth(String dbName, String tblName, short maxParts, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); } - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + SharedCache sharedCache = sharedCacheWrapper.get(); + Table t = sharedCache.getTableFromCache(dbName, tblName); List partitions = new ArrayList<>(); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { if (maxParts == -1 || count < maxParts) { String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, @@ -1476,16 +1602,16 @@ public class CachedStore implements RawStore, Configurable { public List listPartitionNamesPs(String dbName, String tblName, List partVals, short maxParts) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); } + SharedCache sharedCache = sharedCacheWrapper.get(); List partNames = new ArrayList<>(); int count = 0; - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + Table t = sharedCache.getTableFromCache(dbName, tblName); + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; for (int i=0;i listPartitionsPsWithAuth(String dbName, - String tblName, List partVals, short maxParts, String userName, - List groupNames) + public List listPartitionsPsWithAuth(String dbName, String tblName, + List partVals, short maxParts, String userName, List groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.listPartitionsPsWithAuth( - dbName, tblName, partVals, maxParts, userName, groupNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName, + groupNames); } + SharedCache sharedCache = sharedCacheWrapper.get(); List partitions = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; - for (int i=0;i statsObjs = colStats.getStatsObj(); - Table tbl = getTable(dbName, tableName); + Table tbl = getTable(dbName, tblName); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - // Update table try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), tbl); + sharedCache.alterTableInCache(dbName, tblName, tbl); } finally { tableCacheLock.readLock().unlock(); } - // Update table col stats try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.updateTableColStatsInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), statsObjs); + sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -1588,18 +1712,18 @@ public class CachedStore implements RawStore, Configurable { } @Override - public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, + public ColumnStatistics getTableColumnStatistics(String dbName, String tblName, List colNames) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableColumnStatistics(dbName, tableName, colNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); + SharedCache sharedCache = sharedCacheWrapper.get(); + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); List colStatObjs = new ArrayList<>(); for (String colName : colNames) { - String colStatsCacheKey = - CacheUtils.buildKey(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), colName); + String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName); ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); if (colStat != null) { colStatObjs.add(colStat); @@ -1613,18 +1737,22 @@ public class CachedStore implements RawStore, Configurable { } @Override - public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) + public boolean deleteTableColumnStatistics(String dbName, String tblName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + boolean succ = rawStore.deleteTableColumnStatistics(dbName, tblName, colName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), colName); + sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -1637,37 +1765,35 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); if (succ) { + String dbName = StringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()); + String tblName = StringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; - String dbName = colStats.getStatsDesc().getDbName(); - String tableName = colStats.getStatsDesc().getTableName(); + if (sharedCache == null) return succ; List statsObjs = colStats.getStatsObj(); - Partition part = getPartition(dbName, tableName, partVals); + Partition part = getPartition(dbName, tblName, partVals); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - // Update partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), partVals, part); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, part); } finally { partitionCacheLock.readLock().unlock(); } - // Update partition column stats try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.updatePartitionColStatsInCache( - StringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - StringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj()); } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1678,27 +1804,30 @@ public class CachedStore implements RawStore, Configurable { @Override // TODO: calculate from cached values. - // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache. public List getPartitionColumnStatistics(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); } @Override - public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, - List partVals, String colName) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { + public boolean deletePartitionColumnStatistics(String dbName, String tblName, String partName, + List partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = - rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + rawStore.deletePartitionColumnStatistics(dbName, tblName, partName, partVals, colName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return succ; + } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if (sharedCache == null) return succ; try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), partVals, colName); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -1708,14 +1837,16 @@ public class CachedStore implements RawStore, Configurable { @Override public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, - List colNames) throws MetaException, NoSuchObjectException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + List colNames) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } - List colStats = mergeColStatsForPartitions( - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - partNames, colNames, sharedCache); + SharedCache sharedCache = sharedCacheWrapper.get(); + List colStats = + mergeColStatsForPartitions(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), partNames, colNames, sharedCache); return new AggrStats(colStats, partNames.size()); } @@ -1821,6 +1952,11 @@ public class CachedStore implements RawStore, Configurable { public void dropPartitions(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return; + } SharedCache sharedCache = sharedCacheWrapper.get(); if (sharedCache == null) return; // Remove partitions @@ -1830,8 +1966,7 @@ public class CachedStore implements RawStore, Configurable { isPartitionCacheDirty.set(true); for (String partName : partNames) { List vals = partNameToVals(partName); - sharedCache.removePartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), vals); + sharedCache.removePartitionFromCache(dbName, tblName, vals); } } finally { partitionCacheLock.readLock().unlock(); @@ -1843,8 +1978,7 @@ public class CachedStore implements RawStore, Configurable { isPartitionColStatsCacheDirty.set(true); for (String partName : partNames) { List part_vals = partNameToVals(partName); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part_vals); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -2017,29 +2151,17 @@ public class CachedStore implements RawStore, Configurable { @Override public int getTableCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableCount(); - } - return sharedCache.getCachedTableCount(); + return rawStore.getTableCount(); } @Override public int getPartitionCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitionCount(); - } - return sharedCache.getCachedPartitionCount(); + return rawStore.getPartitionCount(); } @Override public int getDatabaseCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getDatabaseCount(); - } - return sharedCache.getCachedDatabaseCount(); + return rawStore.getDatabaseCount(); } @Override @@ -2072,14 +2194,17 @@ public class CachedStore implements RawStore, Configurable { } @Override - public List createTableWithConstraints(Table tbl, - List primaryKeys, List foreignKeys, - List uniqueConstraints, - List notNullConstraints) - throws InvalidObjectException, MetaException { + public List createTableWithConstraints(Table tbl, List primaryKeys, + List foreignKeys, List uniqueConstraints, + List notNullConstraints) throws InvalidObjectException, MetaException { // TODO constraintCache - List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, - uniqueConstraints, notNullConstraints); + List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, + foreignKeys, uniqueConstraints, notNullConstraints); + String dbName = StringUtils.normalizeIdentifier(tbl.getDbName()); + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); + if (!shouldCacheTable(dbName, tblName)) { + return constraintNames; + } SharedCache sharedCache = sharedCacheWrapper.get(); if (sharedCache == null) return constraintNames; sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), @@ -2152,7 +2277,7 @@ public class CachedStore implements RawStore, Configurable { private final SharedCache instance = new SharedCache(); private final Object initLock = new Object(); - private InitState initState = InitState.NOT_ENABLED; + private volatile InitState initState = InitState.NOT_ENABLED; // We preserve the old setConf init behavior, where a failed prewarm would fail the query // and give a chance to another query to try prewarming again. Basically, we'd increment the // count and all the queries waiting for prewarm would fail; however, we will retry the prewarm @@ -2171,7 +2296,7 @@ public class CachedStore implements RawStore, Configurable { if (isSuccessful) { initState = InitState.INITIALIZED; } else if (isFatal) { - initState = InitState.FAILED_FATAL; + initState = InitState.FAILED_FATAL; lastError = error; } else { ++initFailureCount; @@ -2234,6 +2359,20 @@ public class CachedStore implements RawStore, Configurable { } } } + + /** + * Notify all threads blocked on initialization, to continue work. (We allow read while prewarm + * is running; all write calls are blocked using waitForInitAndBlock). + */ + void notifyAllBlocked() { + synchronized (initLock) { + initLock.notifyAll(); + } + } + + boolean isInitialized() { + return initState.equals(InitState.INITIALIZED); + } } @VisibleForTesting @@ -2305,4 +2444,71 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException { return rawStore.getTriggersForResourcePlan(resourcePlanName); } + + static boolean isNotInBlackList(String dbName, String tblName) { + String str = dbName + "." + tblName; + for (Pattern pattern : blacklistPatterns) { + LOG.debug("Trying to match: {} against blacklist pattern: {}", str, pattern); + Matcher matcher = pattern.matcher(str); + if (matcher.matches()) { + LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), + matcher.start(), matcher.end()); + return false; + } + } + return true; + } + + static boolean isInWhitelist(String dbName, String tblName) { + String str = dbName + "." + tblName; + for (Pattern pattern : whitelistPatterns) { + LOG.debug("Trying to match: {} against whitelist pattern: {}", str, pattern); + Matcher matcher = pattern.matcher(str); + if (matcher.matches()) { + LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), + matcher.start(), matcher.end()); + return true; + } + } + return false; + } + + // For testing + static void setWhitelistPattern(List patterns) { + whitelistPatterns = patterns; + } + + // For testing + static void setBlacklistPattern(List patterns) { + blacklistPatterns = patterns; + } + + // Determines if we should cache a table (& its partitions, stats etc), + // based on whitelist/blacklist + static boolean shouldCacheTable(String dbName, String tblName) { + if (!isNotInBlackList(dbName, tblName)) { + LOG.debug("{}.{} is in blacklist, skipping", dbName, tblName); + return false; + } + if (!isInWhitelist(dbName, tblName)) { + LOG.debug("{}.{} is not in whitelist, skipping", dbName, tblName); + return false; + } + return true; + } + + static List createPatterns(String configStr) { + List patternStrs = Arrays.asList(configStr.split(",")); + List patterns = new ArrayList(); + for (String str : patternStrs) { + patterns.add(Pattern.compile(str)); + } + return patterns; + } + + static boolean isBlacklistWhitelistEmpty(Configuration conf) { + return MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST) + .equals(".*") + && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST).isEmpty(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f1698b62/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index a76b848..b606779 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -318,6 +318,28 @@ public class SharedCache { return wrapper.getPartition(); } + /** + * Given a db + table, remove all partitions for this table from the cache + * @param dbName + * @param tblName + * @return + */ + public synchronized void removePartitionsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = partitionCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + PartitionWrapper wrapper = entry.getValue(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + if (wrapper.getSdHash() != null) { + decrSd(wrapper.getSdHash()); + } + } + } + } + // Remove cached column stats for all partitions of a table public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); http://git-wip-us.apache.org/repos/asf/hive/blob/f1698b62/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 8e35d44..fb904ab 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -273,6 +273,17 @@ public class MetastoreConf { CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY("metastore.cached.rawstore.cache.update.frequency", "hive.metastore.cached.rawstore.cache.update.frequency", 60, TimeUnit.SECONDS, "The time after which metastore cache is updated from metastore DB."), + CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST("metastore.cached.rawstore.cached.object.whitelist", + "hive.metastore.cached.rawstore.cached.object.whitelist", ".*", "Comma separated list of regular expressions \n " + + "to select the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.blacklist. \n" + + "Example: .*, db1.*, db2\\.tbl.*. The last item can potentially override patterns specified before."), + CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST("metastore.cached.rawstore.cached.object.blacklist", + "hive.metastore.cached.rawstore.cached.object.blacklist", "", "Comma separated list of regular expressions \n " + + "to filter out the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.whitelist. \n" + + "Example: db2.*, db3\\.tbl1, db3\\..*. The last item can potentially override patterns specified before. \n" + + "The blacklist also overrides the whitelist."), CAPABILITY_CHECK("metastore.client.capability.check", "hive.metastore.client.capability.check", true, "Whether to check client capabilities for potentially breaking API usage."), http://git-wip-us.apache.org/repos/asf/hive/blob/f1698b62/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index 50e4244..1dd3e7e 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -307,10 +307,13 @@ public class MetaStoreUtils { // Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different // results for hashes based on the OS or JVM being used. md.reset(); - for (FieldSchema fs : sd.getCols()) { - md.update(fs.getName().getBytes(ENCODING)); - md.update(fs.getType().getBytes(ENCODING)); - if (fs.getComment() != null) md.update(fs.getComment().getBytes(ENCODING)); + // In case cols are null + if (sd.getCols() != null) { + for (FieldSchema fs : sd.getCols()) { + md.update(fs.getName().getBytes(ENCODING)); + md.update(fs.getType().getBytes(ENCODING)); + if (fs.getComment() != null) md.update(fs.getComment().getBytes(ENCODING)); + } } if (sd.getInputFormat() != null) { md.update(sd.getInputFormat().getBytes(ENCODING));