hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [37/54] [abbrv] hive git commit: HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)
Date Wed, 24 May 2017 23:52:11 GMT
HIVE-16579: CachedStore: improvements to partition col stats caching and cache column stats for unpartitioned table (Daniel Dai, Thejas Nair, Vaibhav Gumashta reviewed by Daniel Dai, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d85beaa9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d85beaa9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d85beaa9

Branch: refs/heads/hive-14535
Commit: d85beaa99ba349d9334d3d96abb6e89c94db8481
Parents: 952fe6e
Author: Vaibhav Gumashta <vgumashta@hortonworks.com>
Authored: Mon May 22 15:52:58 2017 -0700
Committer: Vaibhav Gumashta <vgumashta@hortonworks.com>
Committed: Mon May 22 15:52:58 2017 -0700

----------------------------------------------------------------------
 .../listener/DummyRawStoreFailEvent.java        |   4 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   2 +-
 .../hive/metastore/MetaStoreDirectSql.java      |  73 +-
 .../hadoop/hive/metastore/MetaStoreUtils.java   |  11 +-
 .../hadoop/hive/metastore/ObjectStore.java      |  19 +-
 .../apache/hadoop/hive/metastore/RawStore.java  |   8 +-
 .../hive/metastore/StatObjectConverter.java     | 148 +++
 .../hadoop/hive/metastore/cache/CacheUtils.java |  31 +
 .../hive/metastore/cache/CachedStore.java       | 943 ++++++++++++-------
 .../hive/metastore/cache/SharedCache.java       | 293 +++++-
 .../hadoop/hive/metastore/hbase/HBaseStore.java |   2 +-
 .../stats/merge/ColumnStatsMergerFactory.java   |  18 +-
 .../stats/merge/DateColumnStatsMerger.java      |  55 ++
 .../DummyRawStoreControlledCommit.java          |   2 +-
 .../DummyRawStoreForJdoConnection.java          |   2 +-
 .../hive/metastore/cache/TestCachedStore.java   | 450 ++++++++-
 16 files changed, 1637 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 91a3a38..3dc63bd 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -914,9 +914,9 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
-    return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+    return objectStore.getColStatsForTablePartitions(dbName, tableName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index d296851..111cc11 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -350,7 +350,7 @@ public class QTestUtil {
     if (!useHBaseMetastore) {
       // Plug verifying metastore in for testing DirectSQL.
       conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
-        "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
+          "org.apache.hadoop.hive.metastore.VerifyingObjectStore");
     } else {
       conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, HBaseStore.class.getName());
       conf.setBoolVar(ConfVars.METASTORE_FASTPATH, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index b96c27e..df73693 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -1208,7 +1208,9 @@ class MetaStoreDirectSql {
       }
     };
     List<Object[]> list = runBatched(colNames, b);
-    if (list.isEmpty()) return null;
+    if (list.isEmpty()) {
+      return null;
+    }
     ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
     ColumnStatistics result = makeColumnStats(list, csd, 0);
     b.closeAllQueries();
@@ -1343,41 +1345,26 @@ class MetaStoreDirectSql {
 
   // Get aggregated column stats for a table per partition for all columns in the partition
   // This is primarily used to populate stats object when using CachedStore (Check CachedStore#prewarm)
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
-      String tblName, boolean useDensityFunctionForNDVEstimation, double ndvTuner) throws MetaException {
-    String queryText = "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", "
-        + "min(\"LONG_LOW_VALUE\"), max(\"LONG_HIGH_VALUE\"), min(\"DOUBLE_LOW_VALUE\"), max(\"DOUBLE_HIGH_VALUE\"), "
-        + "min(cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal)), max(cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)), "
-        + "sum(\"NUM_NULLS\"), max(\"NUM_DISTINCTS\"), "
-        + "max(\"AVG_COL_LEN\"), max(\"MAX_COL_LEN\"), sum(\"NUM_TRUES\"), sum(\"NUM_FALSES\"), "
-        // The following data is used to compute a partitioned table's NDV based
-        // on partitions' NDV when useDensityFunctionForNDVEstimation = true. Global NDVs cannot be
-        // accurately derived from partition NDVs, because the domain of column value two partitions
-        // can overlap. If there is no overlap then global NDV is just the sum
-        // of partition NDVs (UpperBound). But if there is some overlay then
-        // global NDV can be anywhere between sum of partition NDVs (no overlap)
-        // and same as one of the partition NDV (domain of column value in all other
-        // partitions is subset of the domain value in one of the partition)
-        // (LowerBound).But under uniform distribution, we can roughly estimate the global
-        // NDV by leveraging the min/max values.
-        // And, we also guarantee that the estimation makes sense by comparing it to the
-        // UpperBound (calculated by "sum(\"NUM_DISTINCTS\")")
-        // and LowerBound (calculated by "max(\"NUM_DISTINCTS\")")
-        + "avg((\"LONG_HIGH_VALUE\"-\"LONG_LOW_VALUE\")/cast(\"NUM_DISTINCTS\" as decimal)),"
-        + "avg((\"DOUBLE_HIGH_VALUE\"-\"DOUBLE_LOW_VALUE\")/\"NUM_DISTINCTS\"),"
-        + "avg((cast(\"BIG_DECIMAL_HIGH_VALUE\" as decimal)-cast(\"BIG_DECIMAL_LOW_VALUE\" as decimal))/\"NUM_DISTINCTS\"),"
-        + "sum(\"NUM_DISTINCTS\") from \"PART_COL_STATS\""
-        + " where \"DB_NAME\" = ? and \"TABLE_NAME\" = ? group by \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\"";
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+      String tblName) throws MetaException {
+    String queryText =
+        "select \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"LONG_LOW_VALUE\", "
+            + "\"LONG_HIGH_VALUE\", \"DOUBLE_LOW_VALUE\", \"DOUBLE_HIGH_VALUE\",  "
+            + "\"BIG_DECIMAL_LOW_VALUE\", \"BIG_DECIMAL_HIGH_VALUE\", \"NUM_NULLS\", "
+            + "\"NUM_DISTINCTS\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\""
+            + " from \"PART_COL_STATS\" where \"DB_NAME\" = ? and \"TABLE_NAME\" = ?"
+            + " order by \"PARTITION_NAME\"";
     long start = 0;
     long end = 0;
     Query query = null;
     boolean doTrace = LOG.isDebugEnabled();
     Object qResult = null;
-    ForwardQueryResult fqr = null;
     start = doTrace ? System.nanoTime() : 0;
     query = pm.newQuery("javax.jdo.query.SQL", queryText);
-    qResult = executeWithArray(query,
-        prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()), queryText);
+    qResult =
+        executeWithArray(query,
+            prepareParams(dbName, tblName, new ArrayList<String>(), new ArrayList<String>()),
+            queryText);
     if (qResult == null) {
       query.closeAll();
       return Maps.newHashMap();
@@ -1385,13 +1372,31 @@ class MetaStoreDirectSql {
     end = doTrace ? System.nanoTime() : 0;
     timingTrace(doTrace, queryText, start, end);
     List<Object[]> list = ensureList(qResult);
-    Map<String, ColumnStatisticsObj> partColStatsMap = new HashMap<String, ColumnStatisticsObj>();
+    Map<String, List<ColumnStatisticsObj>> partColStatsMap =
+        new HashMap<String, List<ColumnStatisticsObj>>();
+    String partNameCurrent = null;
+    List<ColumnStatisticsObj> partColStatsList = new ArrayList<ColumnStatisticsObj>();
     for (Object[] row : list) {
       String partName = (String) row[0];
-      String colName = (String) row[1];
-      partColStatsMap.put(
-          CacheUtils.buildKey(dbName, tblName, CachedStore.partNameToVals(partName), colName),
-          prepareCSObjWithAdjustedNDV(row, 1, useDensityFunctionForNDVEstimation, ndvTuner));
+      if (partNameCurrent == null) {
+        // Update the current partition we are working on
+        partNameCurrent = partName;
+        // Create a new list for this new partition
+        partColStatsList = new ArrayList<ColumnStatisticsObj>();
+        // Add the col stat for the current column
+        partColStatsList.add(prepareCSObj(row, 1));
+      } else if (!partNameCurrent.equalsIgnoreCase(partName)) {
+        // Save the previous partition and its col stat list
+        partColStatsMap.put(partNameCurrent, partColStatsList);
+        // Update the current partition we are working on
+        partNameCurrent = partName;
+        // Create a new list for this new partition
+        partColStatsList = new ArrayList<ColumnStatisticsObj>();
+        // Add the col stat for the current column
+        partColStatsList.add(prepareCSObj(row, 1));
+      } else {
+        partColStatsList.add(prepareCSObj(row, 1));
+      }
       Deadline.checkTimeout();
     }
     query.closeAll();

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 870896c..8328428 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -1171,6 +1171,15 @@ public class MetaStoreUtils {
     return addCols(getSchemaWithoutCols(sd, tblsd, parameters, databaseName, tableName, partitionKeys), tblsd.getCols());
   }
 
+  public static List<String> getColumnNamesForTable(Table table) {
+    List<String> colNames = new ArrayList<String>();
+    Iterator<FieldSchema> colsIterator = table.getSd().getColsIterator();
+    while (colsIterator.hasNext()) {
+      colNames.add(colsIterator.next().getName());
+    }
+    return colNames;
+  }
+
   public static String getColumnNameDelimiter(List<FieldSchema> fieldSchemas) {
     // we first take a look if any fieldSchemas contain COMMA
     for (int i = 0; i < fieldSchemas.size(); i++) {
@@ -1180,7 +1189,7 @@ public class MetaStoreUtils {
     }
     return String.valueOf(SerDeUtils.COMMA);
   }
-  
+
   /**
    * Convert FieldSchemas to columnNames.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index b28983f..19becb8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -7173,23 +7173,18 @@ public class ObjectStore implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException {
-    final boolean useDensityFunctionForNDVEstimation = HiveConf.getBoolVar(getConf(),
-        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION);
-    final double ndvTuner = HiveConf.getFloatVar(getConf(),
-        HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_TUNER);
-    return new GetHelper<Map<String, ColumnStatisticsObj>>(dbName, tableName, true, false) {
+    return new GetHelper<Map<String, List<ColumnStatisticsObj>>>(dbName, tableName, true, false) {
       @Override
-      protected Map<String, ColumnStatisticsObj> getSqlResult(
-          GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException {
-        return directSql.getAggrColStatsForTablePartitions(dbName, tblName,
-            useDensityFunctionForNDVEstimation, ndvTuner);
+      protected Map<String, List<ColumnStatisticsObj>> getSqlResult(
+          GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException {
+        return directSql.getColStatsForTablePartitions(dbName, tblName);
       }
 
       @Override
-      protected Map<String, ColumnStatisticsObj> getJdoResult(
-          GetHelper<Map<String, ColumnStatisticsObj>> ctx) throws MetaException,
+      protected Map<String, List<ColumnStatisticsObj>> getJdoResult(
+          GetHelper<Map<String, List<ColumnStatisticsObj>>> ctx) throws MetaException,
           NoSuchObjectException {
         // This is fast path for query optimizations, if we can find this info
         // quickly using directSql, do it. No point in failing back to slow path

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index c1af690..964ffb2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -579,14 +579,16 @@ public interface RawStore extends Configurable {
     List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
 
   /**
-   * Get all partition column statistics for a table
+   * Get all partition column statistics for a table in a db
+   *
    * @param dbName
    * @param tableName
-   * @return Map of partition column statistics
+   * @return Map of partition column statistics. Key in the map is partition name. Value is a list
+   *         of column stat object for each column in the partition
    * @throws MetaException
    * @throws NoSuchObjectException
    */
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
       String tableName) throws MetaException, NoSuchObjectException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
index fcf6f27..2dc2804 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/StatObjectConverter.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields;
 import org.apache.hadoop.hive.metastore.model.MPartition;
 import org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics;
 import org.apache.hadoop.hive.metastore.model.MTable;
@@ -700,4 +701,151 @@ public class StatObjectConverter {
     return new BigDecimal(new BigInteger(d.getUnscaled()), d.getScale()).toString();
   }
 
+  /**
+   * Set field values in oldStatObj from newStatObj
+   * @param oldStatObj
+   * @param newStatObj
+   */
+  public static void setFieldsIntoOldStats(ColumnStatisticsObj oldStatObj,
+      ColumnStatisticsObj newStatObj) {
+    _Fields typeNew = newStatObj.getStatsData().getSetField();
+    _Fields typeOld = oldStatObj.getStatsData().getSetField();
+    typeNew = typeNew == typeOld ? typeNew : null;
+    switch (typeNew) {
+    case BOOLEAN_STATS:
+      BooleanColumnStatsData oldBooleanStatsData = oldStatObj.getStatsData().getBooleanStats();
+      BooleanColumnStatsData newBooleanStatsData = newStatObj.getStatsData().getBooleanStats();
+      if (newBooleanStatsData.isSetNumTrues()) {
+        oldBooleanStatsData.setNumTrues(newBooleanStatsData.getNumTrues());
+      }
+      if (newBooleanStatsData.isSetNumFalses()) {
+        oldBooleanStatsData.setNumFalses(newBooleanStatsData.getNumFalses());
+      }
+      if (newBooleanStatsData.isSetNumNulls()) {
+        oldBooleanStatsData.setNumNulls(newBooleanStatsData.getNumNulls());
+      }
+      if (newBooleanStatsData.isSetBitVectors()) {
+        oldBooleanStatsData.setBitVectors(newBooleanStatsData.getBitVectors());
+      }
+      break;
+    case LONG_STATS: {
+      LongColumnStatsData oldLongStatsData = oldStatObj.getStatsData().getLongStats();
+      LongColumnStatsData newLongStatsData = newStatObj.getStatsData().getLongStats();
+      if (newLongStatsData.isSetHighValue()) {
+        oldLongStatsData.setHighValue(newLongStatsData.getHighValue());
+      }
+      if (newLongStatsData.isSetLowValue()) {
+        oldLongStatsData.setLowValue(newLongStatsData.getLowValue());
+      }
+      if (newLongStatsData.isSetNumNulls()) {
+        oldLongStatsData.setNumNulls(newLongStatsData.getNumNulls());
+      }
+      if (newLongStatsData.isSetNumDVs()) {
+        oldLongStatsData.setNumDVs(newLongStatsData.getNumDVs());
+      }
+      if (newLongStatsData.isSetBitVectors()) {
+        oldLongStatsData.setBitVectors(newLongStatsData.getBitVectors());
+      }
+      break;
+    }
+    case DOUBLE_STATS: {
+      DoubleColumnStatsData oldDoubleStatsData = oldStatObj.getStatsData().getDoubleStats();
+      DoubleColumnStatsData newDoubleStatsData = newStatObj.getStatsData().getDoubleStats();
+      if (newDoubleStatsData.isSetHighValue()) {
+        oldDoubleStatsData.setHighValue(newDoubleStatsData.getHighValue());
+      }
+      if (newDoubleStatsData.isSetLowValue()) {
+        oldDoubleStatsData.setLowValue(newDoubleStatsData.getLowValue());
+      }
+      if (newDoubleStatsData.isSetNumNulls()) {
+        oldDoubleStatsData.setNumNulls(newDoubleStatsData.getNumNulls());
+      }
+      if (newDoubleStatsData.isSetNumDVs()) {
+        oldDoubleStatsData.setNumDVs(newDoubleStatsData.getNumDVs());
+      }
+      if (newDoubleStatsData.isSetBitVectors()) {
+        oldDoubleStatsData.setBitVectors(newDoubleStatsData.getBitVectors());
+      }
+      break;
+    }
+    case STRING_STATS: {
+      StringColumnStatsData oldStringStatsData = oldStatObj.getStatsData().getStringStats();
+      StringColumnStatsData newStringStatsData = newStatObj.getStatsData().getStringStats();
+      if (newStringStatsData.isSetMaxColLen()) {
+        oldStringStatsData.setMaxColLen(newStringStatsData.getMaxColLen());
+      }
+      if (newStringStatsData.isSetAvgColLen()) {
+        oldStringStatsData.setAvgColLen(newStringStatsData.getAvgColLen());
+      }
+      if (newStringStatsData.isSetNumNulls()) {
+        oldStringStatsData.setNumNulls(newStringStatsData.getNumNulls());
+      }
+      if (newStringStatsData.isSetNumDVs()) {
+        oldStringStatsData.setNumDVs(newStringStatsData.getNumDVs());
+      }
+      if (newStringStatsData.isSetBitVectors()) {
+        oldStringStatsData.setBitVectors(newStringStatsData.getBitVectors());
+      }
+      break;
+    }
+    case BINARY_STATS:
+      BinaryColumnStatsData oldBinaryStatsData = oldStatObj.getStatsData().getBinaryStats();
+      BinaryColumnStatsData newBinaryStatsData = newStatObj.getStatsData().getBinaryStats();
+      if (newBinaryStatsData.isSetMaxColLen()) {
+        oldBinaryStatsData.setMaxColLen(newBinaryStatsData.getMaxColLen());
+      }
+      if (newBinaryStatsData.isSetAvgColLen()) {
+        oldBinaryStatsData.setAvgColLen(newBinaryStatsData.getAvgColLen());
+      }
+      if (newBinaryStatsData.isSetNumNulls()) {
+        oldBinaryStatsData.setNumNulls(newBinaryStatsData.getNumNulls());
+      }
+      if (newBinaryStatsData.isSetBitVectors()) {
+        oldBinaryStatsData.setBitVectors(newBinaryStatsData.getBitVectors());
+      }
+      break;
+    case DECIMAL_STATS: {
+      DecimalColumnStatsData oldDecimalStatsData = oldStatObj.getStatsData().getDecimalStats();
+      DecimalColumnStatsData newDecimalStatsData = newStatObj.getStatsData().getDecimalStats();
+      if (newDecimalStatsData.isSetHighValue()) {
+        oldDecimalStatsData.setHighValue(newDecimalStatsData.getHighValue());
+      }
+      if (newDecimalStatsData.isSetLowValue()) {
+        oldDecimalStatsData.setLowValue(newDecimalStatsData.getLowValue());
+      }
+      if (newDecimalStatsData.isSetNumNulls()) {
+        oldDecimalStatsData.setNumNulls(newDecimalStatsData.getNumNulls());
+      }
+      if (newDecimalStatsData.isSetNumDVs()) {
+        oldDecimalStatsData.setNumDVs(newDecimalStatsData.getNumDVs());
+      }
+      if (newDecimalStatsData.isSetBitVectors()) {
+        oldDecimalStatsData.setBitVectors(newDecimalStatsData.getBitVectors());
+      }
+      break;
+    }
+    case DATE_STATS: {
+      DateColumnStatsData oldDateStatsData = oldStatObj.getStatsData().getDateStats();
+      DateColumnStatsData newDateStatsData = newStatObj.getStatsData().getDateStats();
+      if (newDateStatsData.isSetHighValue()) {
+        oldDateStatsData.setHighValue(newDateStatsData.getHighValue());
+      }
+      if (newDateStatsData.isSetLowValue()) {
+        oldDateStatsData.setLowValue(newDateStatsData.getLowValue());
+      }
+      if (newDateStatsData.isSetNumNulls()) {
+        oldDateStatsData.setNumNulls(newDateStatsData.getNumNulls());
+      }
+      if (newDateStatsData.isSetNumDVs()) {
+        oldDateStatsData.setNumDVs(newDateStatsData.getNumDVs());
+      }
+      if (newDateStatsData.isSetBitVectors()) {
+        oldDateStatsData.setBitVectors(newDateStatsData.getBitVectors());
+      }
+      break;
+    }
+    default:
+      throw new IllegalArgumentException("Unknown stats type: " + typeNew.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index 668499b..280655d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -38,6 +38,10 @@ public class CacheUtils {
     return dbName + delimit + tableName;
   }
 
+  public static String buildKeyWithDelimit(String dbName, String tableName) {
+    return buildKey(dbName, tableName) + delimit;
+  }
+
   public static String buildKey(String dbName, String tableName, List<String> partVals) {
     String key = buildKey(dbName, tableName);
     if (partVals == null || partVals.size() == 0) {
@@ -52,11 +56,38 @@ public class CacheUtils {
     return key;
   }
 
+  public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) {
+    return buildKey(dbName, tableName, partVals) + delimit;
+  }
+
   public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
     String key = buildKey(dbName, tableName, partVals);
     return key + delimit + colName;
   }
 
+  public static String buildKey(String dbName, String tableName, String colName) {
+    String key = buildKey(dbName, tableName);
+    return key + delimit + colName;
+  }
+
+  public static String[] splitTableColStats(String key) {
+    return key.split(delimit);
+  }
+
+  public static Object[] splitPartitionColStats(String key) {
+    Object[] result = new Object[4];
+    String[] comps = key.split(delimit);
+    result[0] = comps[0];
+    result[1] = comps[1];
+    List<String> vals = new ArrayList<String>();
+    for (int i=2;i<comps.length-2;i++) {
+      vals.add(comps[i]);
+    }
+    result[2] = vals;
+    result[3] = comps[comps.length-1];
+    return result;
+  }
+
   public static Table assemble(TableWrapper wrapper) {
     Table t = wrapper.getTable().deepCopy();
     if (wrapper.getSdHash()!=null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 1cc838f..78aab91 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -26,12 +26,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Deadline;
 import org.apache.hadoop.hive.metastore.FileMetadataHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -41,18 +44,11 @@ import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
-import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -77,13 +72,14 @@ import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMerger;
+import org.apache.hadoop.hive.metastore.hbase.stats.merge.ColumnStatsMergerFactory;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -104,15 +100,25 @@ import com.google.common.annotations.VisibleForTesting;
 // TODO initial load slow?
 // TODO size estimation
 // TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
-// TODO factor in NDV estimation (density based estimation) logic when merging NDVs from 2 colStats object
-// TODO refactor to use same common code with StatObjectConverter (for merging 2 col stats objects)
 
 public class CachedStore implements RawStore, Configurable {
   private static ScheduledExecutorService cacheUpdateMaster = null;
-  private static AtomicReference<Thread> runningMasterThread = new AtomicReference<Thread>(null);
+  private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
+  private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+  private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
+      true);
+  private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
   RawStore rawStore;
   Configuration conf;
   private PartitionExpressionProxy expressionProxy = null;
+  // Default value set to 100 milliseconds for test purpose
+  private long cacheRefreshPeriod = 100;
   static boolean firstTime = true;
 
   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
@@ -209,6 +215,8 @@ public class CachedStore implements RawStore, Configurable {
         LOG.info("Prewarming CachedStore");
         prewarm();
         LOG.info("CachedStore initialized");
+        // Start the cache update master-worker threads
+        startCacheUpdateService();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -216,7 +224,10 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  private void prewarm() throws Exception {
+  @VisibleForTesting
+  void prewarm() throws Exception {
+    // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+    Deadline.registerIfNot(1000000);
     List<String> dbNames = rawStore.getAllDatabases();
     for (String dbName : dbNames) {
       Database db = rawStore.getDatabase(dbName);
@@ -226,35 +237,81 @@ public class CachedStore implements RawStore, Configurable {
         Table table = rawStore.getTable(dbName, tblName);
         SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(dbName),
             HiveStringUtils.normalizeIdentifier(tblName), table);
+        Deadline.startTimer("getPartitions");
         List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+        Deadline.stopTimer();
         for (Partition partition : partitions) {
           SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
               HiveStringUtils.normalizeIdentifier(tblName), partition);
         }
-        Map<String, ColumnStatisticsObj> aggrStatsPerPartition = rawStore
-            .getAggrColStatsForTablePartitions(dbName, tblName);
-        SharedCache.addPartitionColStatsToCache(aggrStatsPerPartition);
+        // Cache partition column stats
+        Deadline.startTimer("getColStatsForTablePartitions");
+        Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+            rawStore.getColStatsForTablePartitions(dbName, tblName);
+        Deadline.stopTimer();
+        if (colStatsPerPartition != null) {
+          SharedCache.addPartitionColStatsToCache(dbName, tblName, colStatsPerPartition);
+        }
+        // Cache table column stats
+        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+        Deadline.startTimer("getTableColumnStatistics");
+        ColumnStatistics tableColStats =
+            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+        Deadline.stopTimer();
+        if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
+          SharedCache.addTableColStatsToCache(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+        }
       }
     }
-    // Start the cache update master-worker threads
-    startCacheUpdateService();
   }
 
-  private synchronized void startCacheUpdateService() {
+  @VisibleForTesting
+  synchronized void startCacheUpdateService() {
     if (cacheUpdateMaster == null) {
       cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
         public Thread newThread(Runnable r) {
           Thread t = Executors.defaultThreadFactory().newThread(r);
+          t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
           t.setDaemon(true);
           return t;
         }
       });
-      cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), 0, HiveConf
-          .getTimeVar(conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
-              TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+      if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+        cacheRefreshPeriod =
+            HiveConf.getTimeVar(conf,
+                HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY,
+                TimeUnit.MILLISECONDS);
+      }
+      LOG.info("CachedStore: starting cache update service (run every " + cacheRefreshPeriod + "ms");
+      cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(this), cacheRefreshPeriod,
+          cacheRefreshPeriod, TimeUnit.MILLISECONDS);
     }
   }
 
+  @VisibleForTesting
+  synchronized boolean stopCacheUpdateService(long timeout) {
+    boolean tasksStoppedBeforeShutdown = false;
+    if (cacheUpdateMaster != null) {
+      LOG.info("CachedStore: shutting down cache update service");
+      try {
+        tasksStoppedBeforeShutdown =
+            cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
+            + "complete before shutting down. Will make a hard stop now.");
+      }
+      cacheUpdateMaster.shutdownNow();
+      cacheUpdateMaster = null;
+    }
+    return tasksStoppedBeforeShutdown;
+  }
+
+  @VisibleForTesting
+  void setCacheRefreshPeriod(long time) {
+    this.cacheRefreshPeriod = time;
+  }
+
   static class CacheUpdateMasterWork implements Runnable {
 
     private CachedStore cachedStore;
@@ -265,86 +322,175 @@ public class CachedStore implements RawStore, Configurable {
 
     @Override
     public void run() {
-      runningMasterThread.set(Thread.currentThread());
-      RawStore rawStore = cachedStore.getRawStore();
+      // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+      Deadline.registerIfNot(1000000);
+      LOG.debug("CachedStore: updating cached objects");
+      String rawStoreClassName =
+          HiveConf.getVar(cachedStore.conf, HiveConf.ConfVars.METASTORE_CACHED_RAW_STORE_IMPL,
+              ObjectStore.class.getName());
       try {
+        RawStore rawStore =
+            ((Class<? extends RawStore>) MetaStoreUtils.getClass(rawStoreClassName)).newInstance();
+        rawStore.setConf(cachedStore.conf);
         List<String> dbNames = rawStore.getAllDatabases();
-        // Update the database in cache
-        if (!updateDatabases(rawStore, dbNames)) {
-          return;
-        }
-        // Update the tables and their partitions in cache
-        if (!updateTables(rawStore, dbNames)) {
-          return;
+        if (dbNames != null) {
+          // Update the database in cache
+          updateDatabases(rawStore, dbNames);
+          for (String dbName : dbNames) {
+            // Update the tables in cache
+            updateTables(rawStore, dbName);
+            List<String> tblNames = cachedStore.getAllTables(dbName);
+            for (String tblName : tblNames) {
+              // Update the partitions for a table in cache
+              updateTablePartitions(rawStore, dbName, tblName);
+              // Update the table column stats for a table in cache
+              updateTableColStats(rawStore, dbName, tblName);
+              // Update the partitions column stats for a table in cache
+              updateTablePartitionColStats(rawStore, dbName, tblName);
+            }
+          }
         }
       } catch (MetaException e) {
         LOG.error("Updating CachedStore: error getting database names", e);
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
       }
     }
 
-    private boolean updateDatabases(RawStore rawStore, List<String> dbNames) {
-      if (dbNames != null) {
-        List<Database> databases = new ArrayList<Database>();
-        for (String dbName : dbNames) {
-          // If a preemption of this thread was requested, simply return before proceeding
-          if (Thread.interrupted()) {
-            return false;
+    private void updateDatabases(RawStore rawStore, List<String> dbNames) {
+      // Prepare the list of databases
+      List<Database> databases = new ArrayList<Database>();
+      for (String dbName : dbNames) {
+        Database db;
+        try {
+          db = rawStore.getDatabase(dbName);
+          databases.add(db);
+        } catch (NoSuchObjectException e) {
+          LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+        }
+      }
+      // Update the cached database objects
+      try {
+        if (databaseCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping database cache update; the database list we have is dirty.");
+            return;
           }
-          Database db;
-          try {
-            db = rawStore.getDatabase(dbName);
-            databases.add(db);
-          } catch (NoSuchObjectException e) {
-            LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
+          SharedCache.refreshDatabases(databases);
+        }
+      } finally {
+        if (databaseCacheLock.isWriteLockedByCurrentThread()) {
+          databaseCacheLock.writeLock().unlock();
+        }
+      }
+    }
+
+    // Update the cached table objects
+    private void updateTables(RawStore rawStore, String dbName) {
+      List<Table> tables = new ArrayList<Table>();
+      try {
+        List<String> tblNames = rawStore.getAllTables(dbName);
+        for (String tblName : tblNames) {
+          Table table =
+              rawStore.getTable(HiveStringUtils.normalizeIdentifier(dbName),
+                  HiveStringUtils.normalizeIdentifier(tblName));
+          tables.add(table);
+        }
+        if (tableCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isTableCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping table cache update; the table list we have is dirty.");
+            return;
           }
+          SharedCache.refreshTables(dbName, tables);
+        }
+      } catch (MetaException e) {
+        LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
+      } finally {
+        if (tableCacheLock.isWriteLockedByCurrentThread()) {
+          tableCacheLock.writeLock().unlock();
         }
-        // Update the cached database objects
-        SharedCache.refreshDatabases(databases);
       }
-      return true;
     }
 
-    private boolean updateTables(RawStore rawStore, List<String> dbNames) {
-      if (dbNames != null) {
-        List<Table> tables = new ArrayList<Table>();
-        for (String dbName : dbNames) {
-          try {
-            List<String> tblNames = rawStore.getAllTables(dbName);
-            for (String tblName : tblNames) {
-              // If a preemption of this thread was requested, simply return before proceeding
-              if (Thread.interrupted()) {
-                return false;
-              }
-              Table table = rawStore.getTable(dbName, tblName);
-              tables.add(table);
-            }
-            // Update the cached database objects
-            SharedCache.refreshTables(dbName, tables);
-            for (String tblName : tblNames) {
-              // If a preemption of this thread was requested, simply return before proceeding
-              if (Thread.interrupted()) {
-                return false;
-              }
-              List<Partition> partitions =
-                  rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
-              SharedCache.refreshPartitions(dbName, tblName, partitions);
-            }
-          } catch (MetaException | NoSuchObjectException e) {
-            LOG.error("Updating CachedStore: unable to read table", e);
-            return false;
+    // Update the cached partition objects for a table
+    private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Deadline.startTimer("getPartitions");
+        List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+        Deadline.stopTimer();
+        if (partitionCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isPartitionCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
+            return;
           }
+          SharedCache.refreshPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), partitions);
+        }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+      } finally {
+        if (partitionCacheLock.isWriteLockedByCurrentThread()) {
+          partitionCacheLock.writeLock().unlock();
         }
       }
-      return true;
     }
-  }
 
-  // Interrupt the cache update background thread
-  // Fire and forget (the master will respond appropriately when it gets a chance)
-  // All writes to the cache go through synchronized methods, so fire & forget is fine.
-  private void interruptCacheUpdateMaster() {
-    if (runningMasterThread.get() != null) {
-      runningMasterThread.get().interrupt();
+    // Update the cached col stats for this table
+    private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Table table = rawStore.getTable(dbName, tblName);
+        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+        Deadline.startTimer("getTableColumnStatistics");
+        ColumnStatistics tableColStats =
+            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+        Deadline.stopTimer();
+        if (tableColStatsCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping table column stats cache update; the table column stats list we "
+                + "have is dirty.");
+            return;
+          }
+          SharedCache.refreshTableColStats(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+        }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
+      } finally {
+        if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
+          tableColStatsCacheLock.writeLock().unlock();
+        }
+      }
+    }
+
+    // Update the cached partition col stats for a table
+    private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Deadline.startTimer("getColStatsForTablePartitions");
+        Map<String, List<ColumnStatisticsObj>> colStatsPerPartition =
+            rawStore.getColStatsForTablePartitions(dbName, tblName);
+        Deadline.stopTimer();
+        if (partitionColStatsCacheLock.writeLock().tryLock()) {
+          // Skip background updates if we detect change
+          if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition column stats cache update; the partition column stats "
+                + "list we have is dirty.");
+            return;
+          }
+          SharedCache.refreshPartitionColStats(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), colStatsPerPartition);
+        }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read partitions column stats of table: "
+            + tblName, e);
+      } finally {
+        if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
+          partitionColStatsCacheLock.writeLock().unlock();
+        }
+      }
     }
   }
 
@@ -374,11 +520,17 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void createDatabase(Database db)
-      throws InvalidObjectException, MetaException {
+  public void createDatabase(Database db) throws InvalidObjectException, MetaException {
     rawStore.createDatabase(db);
-    interruptCacheUpdateMaster();
-    SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()), db.deepCopy());
+    try {
+      // Wait if background cache update is happening
+      databaseCacheLock.readLock().lock();
+      isDatabaseCacheDirty.set(true);
+      SharedCache.addDatabaseToCache(HiveStringUtils.normalizeIdentifier(db.getName()),
+          db.deepCopy());
+    } finally {
+      databaseCacheLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -387,26 +539,38 @@ public class CachedStore implements RawStore, Configurable {
     if (db == null) {
       throw new NoSuchObjectException();
     }
-    return SharedCache.getDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName));
+    return db;
   }
 
   @Override
   public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.dropDatabase(dbname);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+      try {
+        // Wait if background cache update is happening
+        databaseCacheLock.readLock().lock();
+        isDatabaseCacheDirty.set(true);
+        SharedCache.removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbname));
+      } finally {
+        databaseCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public boolean alterDatabase(String dbName, Database db)
-      throws NoSuchObjectException, MetaException {
+  public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
+      MetaException {
     boolean succ = rawStore.alterDatabase(dbName, db);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+      try {
+        // Wait if background cache update is happening
+        databaseCacheLock.readLock().lock();
+        isDatabaseCacheDirty.set(true);
+        SharedCache.alterDatabaseInCache(HiveStringUtils.normalizeIdentifier(dbName), db);
+      } finally {
+        databaseCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
@@ -462,24 +626,45 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void createTable(Table tbl)
-      throws InvalidObjectException, MetaException {
+  public void createTable(Table tbl) throws InvalidObjectException, MetaException {
     rawStore.createTable(tbl);
-    interruptCacheUpdateMaster();
     validateTableType(tbl);
-    SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
-        HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+    try {
+      // Wait if background cache update is happening
+      tableCacheLock.readLock().lock();
+      isTableCacheDirty.set(true);
+      SharedCache.addTableToCache(HiveStringUtils.normalizeIdentifier(tbl.getDbName()),
+          HiveStringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+    } finally {
+      tableCacheLock.readLock().unlock();
+    }
   }
 
   @Override
-  public boolean dropTable(String dbName, String tableName)
-      throws MetaException, NoSuchObjectException, InvalidObjectException,
-      InvalidInputException {
+  public boolean dropTable(String dbName, String tableName) throws MetaException,
+      NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropTable(dbName, tableName);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tableName));
+      // Remove table
+      try {
+        // Wait if background table cache update is happening
+        tableCacheLock.readLock().lock();
+        isTableCacheDirty.set(true);
+        SharedCache.removeTableFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName));
+      } finally {
+        tableCacheLock.readLock().unlock();
+      }
+      // Remove table col stats
+      try {
+        // Wait if background table col stats cache update is happening
+        tableColStatsCacheLock.readLock().lock();
+        isTableColStatsCacheDirty.set(true);
+        SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName));
+      } finally {
+        tableColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
@@ -496,57 +681,74 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public boolean addPartition(Partition part)
-      throws InvalidObjectException, MetaException {
+  public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartition(part);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
-          HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
+            HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+      } finally {
+        partitionCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public boolean addPartitions(String dbName, String tblName,
-      List<Partition> parts) throws InvalidObjectException, MetaException {
+  public boolean addPartitions(String dbName, String tblName, List<Partition> parts)
+      throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(dbName, tblName, parts);
     if (succ) {
-      interruptCacheUpdateMaster();
-      for (Partition part : parts) {
-        SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(part.getDbName()),
-            HiveStringUtils.normalizeIdentifier(part.getTableName()), part);
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        for (Partition part : parts) {
+          SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), part);
+        }
+      } finally {
+        partitionCacheLock.readLock().unlock();
       }
     }
     return succ;
   }
 
   @Override
-  public boolean addPartitions(String dbName, String tblName,
-      PartitionSpecProxy partitionSpec, boolean ifNotExists)
-      throws InvalidObjectException, MetaException {
+  public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy partitionSpec,
+      boolean ifNotExists) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists);
     if (succ) {
-      interruptCacheUpdateMaster();
-      PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
-      while (iterator.hasNext()) {
-        Partition part = iterator.next();
-        SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
-            HiveStringUtils.normalizeIdentifier(tblName), part);
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+        while (iterator.hasNext()) {
+          Partition part = iterator.next();
+          SharedCache.addPartitionToCache(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), part);
+        }
+      } finally {
+        partitionCacheLock.readLock().unlock();
       }
     }
     return succ;
   }
 
   @Override
-  public Partition getPartition(String dbName, String tableName,
-      List<String> part_vals) throws MetaException, NoSuchObjectException {
-    Partition part = SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-        HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+  public Partition getPartition(String dbName, String tableName, List<String> part_vals)
+      throws MetaException, NoSuchObjectException {
+    Partition part =
+        SharedCache.getPartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), part_vals);
     if (part != null) {
       part.unsetPrivileges();
     } else {
-      throw new NoSuchObjectException();
+      throw new NoSuchObjectException("partition values=" + part_vals.toString());
     }
     return part;
   }
@@ -559,14 +761,30 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public boolean dropPartition(String dbName, String tableName,
-      List<String> part_vals) throws MetaException, NoSuchObjectException,
-      InvalidObjectException, InvalidInputException {
+  public boolean dropPartition(String dbName, String tableName, List<String> part_vals)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropPartition(dbName, tableName, part_vals);
     if (succ) {
-      interruptCacheUpdateMaster();
-      SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+      // Remove partition
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+      } finally {
+        partitionCacheLock.readLock().unlock();
+      }
+      // Remove partition col stats
+      try {
+        // Wait if background cache update is happening
+        partitionColStatsCacheLock.readLock().lock();
+        isPartitionColStatsCacheDirty.set(true);
+        SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), part_vals);
+      } finally {
+        partitionColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
@@ -588,10 +806,28 @@ public class CachedStore implements RawStore, Configurable {
   public void alterTable(String dbName, String tblName, Table newTable)
       throws InvalidObjectException, MetaException {
     rawStore.alterTable(dbName, tblName, newTable);
-    interruptCacheUpdateMaster();
     validateTableType(newTable);
-    SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
-        HiveStringUtils.normalizeIdentifier(tblName), newTable);
+    // Update table cache
+    try {
+      // Wait if background cache update is happening
+      tableCacheLock.readLock().lock();
+      isTableCacheDirty.set(true);
+      SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), newTable);
+    } finally {
+      tableCacheLock.readLock().unlock();
+    }
+    // Update partition cache (key might have changed since table name is a
+    // component of key)
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      SharedCache.alterTableInPartitionCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), newTable);
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -685,26 +921,62 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void alterPartition(String dbName, String tblName,
-      List<String> partVals, Partition newPart)
+  public void alterPartition(String dbName, String tblName, List<String> partVals, Partition newPart)
       throws InvalidObjectException, MetaException {
     rawStore.alterPartition(dbName, tblName, partVals, newPart);
-    interruptCacheUpdateMaster();
-    SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
-        HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    // Update partition cache
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
+    // Update partition column stats cache
+    try {
+      // Wait if background cache update is happening
+      partitionColStatsCacheLock.readLock().lock();
+      isPartitionColStatsCacheDirty.set(true);
+      SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+          HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    } finally {
+      partitionColStatsCacheLock.readLock().unlock();
+    }
   }
 
   @Override
-  public void alterPartitions(String dbName, String tblName,
-      List<List<String>> partValsList, List<Partition> newParts)
-      throws InvalidObjectException, MetaException {
+  public void alterPartitions(String dbName, String tblName, List<List<String>> partValsList,
+      List<Partition> newParts) throws InvalidObjectException, MetaException {
     rawStore.alterPartitions(dbName, tblName, partValsList, newParts);
-    interruptCacheUpdateMaster();
-    for (int i=0;i<partValsList.size();i++) {
-      List<String> partVals = partValsList.get(i);
-      Partition newPart = newParts.get(i);
-      SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+    // Update partition cache
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      for (int i = 0; i < partValsList.size(); i++) {
+        List<String> partVals = partValsList.get(i);
+        Partition newPart = newParts.get(i);
+        SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+      }
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
+    // Update partition column stats cache
+    try {
+      // Wait if background cache update is happening
+      partitionColStatsCacheLock.readLock().lock();
+      isPartitionColStatsCacheDirty.set(true);
+      for (int i = 0; i < partValsList.size(); i++) {
+        List<String> partVals = partValsList.get(i);
+        Partition newPart = newParts.get(i);
+        SharedCache.alterPartitionInColStatsCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), partVals, newPart);
+      }
+    } finally {
+      partitionColStatsCacheLock.readLock().unlock();
     }
   }
 
@@ -1095,55 +1367,199 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public boolean updateTableColumnStatistics(ColumnStatistics colStats)
-      throws NoSuchObjectException, MetaException, InvalidObjectException,
-      InvalidInputException {
+      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.updateTableColumnStatistics(colStats);
     if (succ) {
-      SharedCache.updateTableColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
-          HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), colStats.getStatsObj());
+      String dbName = colStats.getStatsDesc().getDbName();
+      String tableName = colStats.getStatsDesc().getTableName();
+      List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+      Table tbl = getTable(dbName, tableName);
+      List<String> colNames = new ArrayList<>();
+      for (ColumnStatisticsObj statsObj : statsObjs) {
+        colNames.add(statsObj.getColName());
+      }
+      StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
+
+      // Update table
+      try {
+        // Wait if background cache update is happening
+        tableCacheLock.readLock().lock();
+        isTableCacheDirty.set(true);
+        SharedCache.alterTableInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), tbl);
+      } finally {
+        tableCacheLock.readLock().unlock();
+      }
+
+      // Update table col stats
+      try {
+        // Wait if background cache update is happening
+        tableColStatsCacheLock.readLock().lock();
+        isTableColStatsCacheDirty.set(true);
+        SharedCache.updateTableColStatsInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), statsObjs);
+      } finally {
+        tableColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public boolean updatePartitionColumnStatistics(ColumnStatistics colStats,
-      List<String> partVals) throws NoSuchObjectException, MetaException,
-      InvalidObjectException, InvalidInputException {
-    boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+  public ColumnStatistics getTableColumnStatistics(String dbName, String tableName,
+      List<String> colNames) throws MetaException, NoSuchObjectException {
+    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName);
+    List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+    for (String colName : colNames) {
+      String colStatsCacheKey =
+          CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tableName), colName);
+      ColumnStatisticsObj colStat = SharedCache.getCachedTableColStats(colStatsCacheKey);
+      if (colStat != null) {
+        colStatObjs.add(colStat);
+      }
+    }
+    if (colStatObjs.isEmpty()) {
+      return null;
+    } else {
+      return new ColumnStatistics(csd, colStatObjs);
+    }
+  }
+
+  @Override
+  public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+    boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
     if (succ) {
-      SharedCache.updatePartitionColumnStatistics(HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
-          HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, colStats.getStatsObj());
+      try {
+        // Wait if background cache update is happening
+        tableColStatsCacheLock.readLock().lock();
+        isTableColStatsCacheDirty.set(true);
+        SharedCache.removeTableColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), colName);
+      } finally {
+        tableColStatsCacheLock.readLock().unlock();
+      }
     }
     return succ;
   }
 
   @Override
-  public ColumnStatistics getTableColumnStatistics(String dbName,
-      String tableName, List<String> colName)
-      throws MetaException, NoSuchObjectException {
-    return rawStore.getTableColumnStatistics(dbName, tableName, colName);
+  public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+    boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
+    if (succ) {
+      String dbName = colStats.getStatsDesc().getDbName();
+      String tableName = colStats.getStatsDesc().getTableName();
+      List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+      Partition part = getPartition(dbName, tableName, partVals);
+      List<String> colNames = new ArrayList<>();
+      for (ColumnStatisticsObj statsObj : statsObjs) {
+        colNames.add(statsObj.getColName());
+      }
+      StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+
+      // Update partition
+      try {
+        // Wait if background cache update is happening
+        partitionCacheLock.readLock().lock();
+        isPartitionCacheDirty.set(true);
+        SharedCache.alterPartitionInCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), partVals, part);
+      } finally {
+        partitionCacheLock.readLock().unlock();
+      }
+
+      // Update partition column stats
+      try {
+        // Wait if background cache update is happening
+        partitionColStatsCacheLock.readLock().lock();
+        isPartitionColStatsCacheDirty.set(true);
+        SharedCache.updatePartitionColStatsInCache(
+            HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()),
+            HiveStringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals,
+            colStats.getStatsObj());
+      } finally {
+        partitionColStatsCacheLock.readLock().unlock();
+      }
+    }
+    return succ;
   }
 
   @Override
-  public List<ColumnStatistics> getPartitionColumnStatistics(String dbName,
-      String tblName, List<String> partNames, List<String> colNames)
-      throws MetaException, NoSuchObjectException {
+  // TODO: calculate from cached values.
+  // Need to see if it makes sense to do this as some col stats maybe out of date/missing on cache.
+  public List<ColumnStatistics> getPartitionColumnStatistics(String dbName, String tblName,
+      List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
     return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
   }
 
   @Override
-  public boolean deletePartitionColumnStatistics(String dbName,
-      String tableName, String partName, List<String> partVals, String colName)
-      throws NoSuchObjectException, MetaException, InvalidObjectException,
-      InvalidInputException {
-    return rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+  public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName,
+      List<String> partVals, String colName) throws NoSuchObjectException, MetaException,
+      InvalidObjectException, InvalidInputException {
+    boolean succ =
+        rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName);
+    if (succ) {
+      try {
+        // Wait if background cache update is happening
+        partitionColStatsCacheLock.readLock().lock();
+        isPartitionColStatsCacheDirty.set(true);
+        SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tableName), partVals, colName);
+      } finally {
+        partitionColStatsCacheLock.readLock().unlock();
+      }
+    }
+    return succ;
   }
 
   @Override
-  public boolean deleteTableColumnStatistics(String dbName, String tableName,
-      String colName) throws NoSuchObjectException, MetaException,
-      InvalidObjectException, InvalidInputException {
-    return rawStore.deleteTableColumnStatistics(dbName, tableName, colName);
+  public AggrStats get_aggr_stats_for(String dbName, String tblName, List<String> partNames,
+      List<String> colNames) throws MetaException, NoSuchObjectException {
+    List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
+    for (String colName : colNames) {
+      ColumnStatisticsObj colStat =
+          mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
+              HiveStringUtils.normalizeIdentifier(tblName), partNames, colName);
+      if (colStat == null) {
+        // Stop and fall back to underlying RawStore
+        colStats = null;
+        break;
+      } else {
+        colStats.add(colStat);
+      }
+    }
+    if (colStats == null) {
+      return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+    } else {
+      return new AggrStats(colStats, partNames.size());
+    }
+  }
+
+  private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
+      List<String> partNames, String colName) throws MetaException {
+    ColumnStatisticsObj colStats = null;
+    for (String partName : partNames) {
+      String colStatsCacheKey =
+          CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
+      ColumnStatisticsObj colStatsForPart =
+          SharedCache.getCachedPartitionColStats(colStatsCacheKey);
+      if (colStatsForPart == null) {
+        // we don't have stats for all the partitions
+        // logic for extrapolation hasn't been added to CacheStore
+        // So stop now, and lets fallback to underlying RawStore
+        return null;
+      }
+      if (colStats == null) {
+        colStats = colStatsForPart;
+      } else {
+        ColumnStatsMerger merger =
+            ColumnStatsMergerFactory.getColumnStatsMerger(colStats, colStatsForPart);
+        merger.merge(colStats, colStatsForPart);
+      }
+    }
+    return colStats;
   }
 
   @Override
@@ -1209,14 +1625,34 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void dropPartitions(String dbName, String tblName,
-      List<String> partNames) throws MetaException, NoSuchObjectException {
+  public void dropPartitions(String dbName, String tblName, List<String> partNames)
+      throws MetaException, NoSuchObjectException {
     rawStore.dropPartitions(dbName, tblName, partNames);
-    interruptCacheUpdateMaster();
-    for (String partName : partNames) {
-      List<String> vals = partNameToVals(partName);
-      SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tblName), vals);
+    // Remove partitions
+    try {
+      // Wait if background cache update is happening
+      partitionCacheLock.readLock().lock();
+      isPartitionCacheDirty.set(true);
+      for (String partName : partNames) {
+        List<String> vals = partNameToVals(partName);
+        SharedCache.removePartitionFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), vals);
+      }
+    } finally {
+      partitionCacheLock.readLock().unlock();
+    }
+    // Remove partition col stats
+    try {
+      // Wait if background cache update is happening
+      partitionColStatsCacheLock.readLock().lock();
+      isPartitionColStatsCacheDirty.set(true);
+      for (String partName : partNames) {
+        List<String> part_vals = partNameToVals(partName);
+        SharedCache.removePartitionColStatsFromCache(HiveStringUtils.normalizeIdentifier(dbName),
+            HiveStringUtils.normalizeIdentifier(tblName), part_vals);
+      }
+    } finally {
+      partitionColStatsCacheLock.readLock().unlock();
     }
   }
 
@@ -1326,130 +1762,6 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public AggrStats get_aggr_stats_for(String dbName, String tblName,
-      List<String> partNames, List<String> colNames)
-      throws MetaException, NoSuchObjectException {
-    List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(colNames.size());
-    for (String colName : colNames) {
-      colStats.add(mergeColStatsForPartitions(HiveStringUtils.normalizeIdentifier(dbName),
-          HiveStringUtils.normalizeIdentifier(tblName), partNames, colName));
-    }
-    // TODO: revisit the partitions not found case for extrapolation
-    return new AggrStats(colStats, partNames.size());
-  }
-
-  private ColumnStatisticsObj mergeColStatsForPartitions(String dbName, String tblName,
-      List<String> partNames, String colName) throws MetaException {
-    ColumnStatisticsObj colStats = null;
-    for (String partName : partNames) {
-      String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
-      ColumnStatisticsObj colStatsForPart = SharedCache.getCachedPartitionColStats(
-          colStatsCacheKey);
-      if (colStats == null) {
-        colStats = colStatsForPart;
-      } else {
-        colStats = mergeColStatsObj(colStats, colStatsForPart);
-      }
-    }
-    return colStats;
-  }
-
-  private ColumnStatisticsObj mergeColStatsObj(ColumnStatisticsObj colStats1,
-      ColumnStatisticsObj colStats2) throws MetaException {
-    if ((!colStats1.getColType().equalsIgnoreCase(colStats2.getColType()))
-        && (!colStats1.getColName().equalsIgnoreCase(colStats2.getColName()))) {
-      throw new MetaException("Can't merge column stats for two partitions for different columns.");
-    }
-    ColumnStatisticsData csd = new ColumnStatisticsData();
-    ColumnStatisticsObj cso = new ColumnStatisticsObj(colStats1.getColName(),
-        colStats1.getColType(), csd);
-    ColumnStatisticsData csData1 = colStats1.getStatsData();
-    ColumnStatisticsData csData2 = colStats2.getStatsData();
-    String colType = colStats1.getColType().toLowerCase();
-    if (colType.equals("boolean")) {
-      BooleanColumnStatsData boolStats = new BooleanColumnStatsData();
-      boolStats.setNumFalses(csData1.getBooleanStats().getNumFalses()
-          + csData2.getBooleanStats().getNumFalses());
-      boolStats.setNumTrues(csData1.getBooleanStats().getNumTrues()
-          + csData2.getBooleanStats().getNumTrues());
-      boolStats.setNumNulls(csData1.getBooleanStats().getNumNulls()
-          + csData2.getBooleanStats().getNumNulls());
-      csd.setBooleanStats(boolStats);
-    } else if (colType.equals("string") || colType.startsWith("varchar")
-        || colType.startsWith("char")) {
-      StringColumnStatsData stringStats = new StringColumnStatsData();
-      stringStats.setNumNulls(csData1.getStringStats().getNumNulls()
-          + csData2.getStringStats().getNumNulls());
-      stringStats.setAvgColLen(Math.max(csData1.getStringStats().getAvgColLen(), csData2
-          .getStringStats().getAvgColLen()));
-      stringStats.setMaxColLen(Math.max(csData1.getStringStats().getMaxColLen(), csData2
-          .getStringStats().getMaxColLen()));
-      stringStats.setNumDVs(Math.max(csData1.getStringStats().getNumDVs(), csData2.getStringStats()
-          .getNumDVs()));
-      csd.setStringStats(stringStats);
-    } else if (colType.equals("binary")) {
-      BinaryColumnStatsData binaryStats = new BinaryColumnStatsData();
-      binaryStats.setNumNulls(csData1.getBinaryStats().getNumNulls()
-          + csData2.getBinaryStats().getNumNulls());
-      binaryStats.setAvgColLen(Math.max(csData1.getBinaryStats().getAvgColLen(), csData2
-          .getBinaryStats().getAvgColLen()));
-      binaryStats.setMaxColLen(Math.max(csData1.getBinaryStats().getMaxColLen(), csData2
-          .getBinaryStats().getMaxColLen()));
-      csd.setBinaryStats(binaryStats);
-    } else if (colType.equals("bigint") || colType.equals("int") || colType.equals("smallint")
-        || colType.equals("tinyint") || colType.equals("timestamp")) {
-      LongColumnStatsData longStats = new LongColumnStatsData();
-      longStats.setNumNulls(csData1.getLongStats().getNumNulls()
-          + csData2.getLongStats().getNumNulls());
-      longStats.setHighValue(Math.max(csData1.getLongStats().getHighValue(), csData2.getLongStats()
-          .getHighValue()));
-      longStats.setLowValue(Math.min(csData1.getLongStats().getLowValue(), csData2.getLongStats()
-          .getLowValue()));
-      longStats.setNumDVs(Math.max(csData1.getLongStats().getNumDVs(), csData2.getLongStats()
-          .getNumDVs()));
-      csd.setLongStats(longStats);
-    } else if (colType.equals("date")) {
-      DateColumnStatsData dateStats = new DateColumnStatsData();
-      dateStats.setNumNulls(csData1.getDateStats().getNumNulls()
-          + csData2.getDateStats().getNumNulls());
-      dateStats.setHighValue(new Date(Math.max(csData1.getDateStats().getHighValue()
-          .getDaysSinceEpoch(), csData2.getDateStats().getHighValue().getDaysSinceEpoch())));
-      dateStats.setHighValue(new Date(Math.min(csData1.getDateStats().getLowValue()
-          .getDaysSinceEpoch(), csData2.getDateStats().getLowValue().getDaysSinceEpoch())));
-      dateStats.setNumDVs(Math.max(csData1.getDateStats().getNumDVs(), csData2.getDateStats()
-          .getNumDVs()));
-      csd.setDateStats(dateStats);
-    } else if (colType.equals("double") || colType.equals("float")) {
-      DoubleColumnStatsData doubleStats = new DoubleColumnStatsData();
-      doubleStats.setNumNulls(csData1.getDoubleStats().getNumNulls()
-          + csData2.getDoubleStats().getNumNulls());
-      doubleStats.setHighValue(Math.max(csData1.getDoubleStats().getHighValue(), csData2
-          .getDoubleStats().getHighValue()));
-      doubleStats.setLowValue(Math.min(csData1.getDoubleStats().getLowValue(), csData2
-          .getDoubleStats().getLowValue()));
-      doubleStats.setNumDVs(Math.max(csData1.getDoubleStats().getNumDVs(), csData2.getDoubleStats()
-          .getNumDVs()));
-      csd.setDoubleStats(doubleStats);
-    } else if (colType.startsWith("decimal")) {
-      DecimalColumnStatsData decimalStats = new DecimalColumnStatsData();
-      decimalStats.setNumNulls(csData1.getDecimalStats().getNumNulls()
-          + csData2.getDecimalStats().getNumNulls());
-      Decimal high = (csData1.getDecimalStats().getHighValue()
-          .compareTo(csData2.getDecimalStats().getHighValue()) > 0) ? csData1.getDecimalStats()
-          .getHighValue() : csData2.getDecimalStats().getHighValue();
-      decimalStats.setHighValue(high);
-      Decimal low = (csData1.getDecimalStats().getLowValue()
-          .compareTo(csData2.getDecimalStats().getLowValue()) < 0) ? csData1.getDecimalStats()
-          .getLowValue() : csData2.getDecimalStats().getLowValue();
-      decimalStats.setLowValue(low);
-      decimalStats.setNumDVs(Math.max(csData1.getDecimalStats().getNumDVs(), csData2
-          .getDecimalStats().getNumDVs()));
-      csd.setDecimalStats(decimalStats);
-    }
-    return cso;
-  }
-
-  @Override
   public NotificationEventResponse getNextNotification(
       NotificationEventRequest rqst) {
     return rawStore.getNextNotification(rqst);
@@ -1565,10 +1877,9 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(
-      String dbName, String tableName)
-      throws MetaException, NoSuchObjectException {
-    return rawStore.getAggrColStatsForTablePartitions(dbName, tableName);
+  public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException {
+    return rawStore.getColStatsForTablePartitions(dbName, tableName);
   }
 
   public RawStore getRawStore() {


Mime
View raw message