impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [3/7] impala git commit: Revert "IMPALA-5538: Use explicit catalog versions for deleted objects"
Date Wed, 29 Nov 2017 06:41:52 GMT
Revert "IMPALA-5538: Use explicit catalog versions for deleted objects"

This reverts commit dd340b8810ecd00ad2ffe79845ca137e941aefb7.
This commit caused a number of issues tracked in IMPALA-6001. The
issues were due to the lack of atomicity between the catalog version
change and the addition to the delete log of a catalog object.

Conflicts:
	be/src/service/impala-server.cc

Change-Id: I3a2cddee5d565384e9de0e61b3b7d0d9075e0dce
Reviewed-on: http://gerrit.cloudera.org:8080/8667
Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a88c3b9c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a88c3b9c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a88c3b9c

Branch: refs/heads/master
Commit: a88c3b9c529b2215b232455598f8f8332c27f996
Parents: 0588309
Author: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Authored: Tue Nov 28 09:58:25 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Nov 29 02:19:50 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  57 ++--
 be/src/catalog/catalog-server.h                 |  39 ++-
 be/src/catalog/catalog-util.cc                  |  15 +
 be/src/catalog/catalog-util.h                   |   9 +
 be/src/catalog/catalog.cc                       |  19 +-
 be/src/catalog/catalog.h                        |  14 +-
 be/src/scheduling/admission-controller.cc       |  18 +-
 be/src/scheduling/admission-controller.h        |   7 +-
 be/src/scheduling/scheduler-test-util.cc        |   5 +-
 be/src/scheduling/scheduler.cc                  |  22 +-
 be/src/service/impala-server.cc                 | 121 ++++----
 be/src/statestore/statestore.cc                 |  56 ++--
 be/src/statestore/statestore.h                  |  43 ++-
 common/thrift/CatalogInternalService.thrift     |  24 +-
 common/thrift/StatestoreService.thrift          |  31 +--
 .../apache/impala/catalog/CatalogDeltaLog.java  | 104 +++----
 .../impala/catalog/CatalogServiceCatalog.java   | 275 +++++++------------
 .../apache/impala/catalog/ImpaladCatalog.java   |  22 +-
 .../impala/service/CatalogOpExecutor.java       |  15 -
 .../org/apache/impala/service/JniCatalog.java   |  11 +-
 tests/statestore/test_statestore.py             |   8 +-
 21 files changed, 438 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b004b22..15685d0 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -228,10 +228,13 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   const TTopicDelta& delta = topic->second;
 
-  // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
-  // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
-  // to reload the full catalog.
-  if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
+  // If this is not a delta update, clear all catalog objects and request an update
+  // from version 0 from the local catalog. There is an optimization that checks if
+  // pending_topic_updates_ was just reloaded from version 0, if they have then skip this
+  // step and use that data.
+  if (delta.from_version == 0 && delta.to_version == 0 &&
+      catalog_objects_min_version_ != 0) {
+    catalog_topic_entry_keys_.clear();
     last_sent_catalog_version_ = 0L;
   } else {
     // Process the pending topic update.
@@ -281,17 +284,14 @@ void CatalogServer::UpdateCatalogTopicCallback(
     } else if (current_catalog_version != last_sent_catalog_version_) {
       // If there has been a change since the last time the catalog was queried,
       // call into the Catalog to find out what has changed.
-      TGetCatalogDeltaResponse catalog_objects;
-      status = catalog_->GetCatalogDelta(last_sent_catalog_version_, &catalog_objects);
+      TGetAllCatalogObjectsResponse catalog_objects;
+      status = catalog_->GetAllCatalogObjects(last_sent_catalog_version_,
+          &catalog_objects);
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        // Use the catalog objects to build a topic update list. These include
-        // objects added to the catalog, 'updated_objects', and objects deleted
-        // from the catalog, 'deleted_objects'. The order in which we process
-        // these two disjoint sets of catalog objects does not matter.
-        BuildTopicUpdates(catalog_objects.updated_objects, false);
-        BuildTopicUpdates(catalog_objects.deleted_objects, true);
+        // Use the catalog objects to build a topic update list.
+        BuildTopicUpdates(catalog_objects.objects);
         catalog_objects_min_version_ = last_sent_catalog_version_;
         catalog_objects_max_version_ = catalog_objects.max_catalog_version;
       }
@@ -302,19 +302,31 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
-void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects,
-    bool topic_deletions) {
+void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) {
+  unordered_set<string> current_entry_keys;
+  // Add any new/updated catalog objects to the topic.
   for (const TCatalogObject& catalog_object: catalog_objects) {
-    DCHECK_GT(catalog_object.catalog_version, last_sent_catalog_version_);
     const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
     if (entry_key.empty()) {
       LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
                                << ThriftDebugString(catalog_object);
     }
+
+    current_entry_keys.insert(entry_key);
+    // Remove this entry from catalog_topic_entry_keys_. At the end of this loop, we will
+    // be left with the set of keys that were in the last update, but not in this
+    // update, indicating which objects have been removed/dropped.
+    catalog_topic_entry_keys_.erase(entry_key);
+
+    // This isn't a new or an updated item, skip it.
+    if (catalog_object.catalog_version <= last_sent_catalog_version_) continue;
+
+    VLOG(1) << "Publishing update: " << entry_key << "@"
+            << catalog_object.catalog_version;
+
     pending_topic_updates_.push_back(TTopicItem());
     TTopicItem& item = pending_topic_updates_.back();
     item.key = entry_key;
-    item.deleted = topic_deletions;
     Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
     if (!status.ok()) {
       LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
@@ -328,9 +340,18 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
         pending_topic_updates_.pop_back();
       }
     }
-    VLOG(1) << "Publishing " << (topic_deletions ? "deletion " : "update ")
-        << ": " << entry_key << "@" << catalog_object.catalog_version;
   }
+
+  // Any remaining items in catalog_topic_entry_keys_ indicate the object was removed
+  // since the last update.
+  for (const string& key: catalog_topic_entry_keys_) {
+    pending_topic_updates_.push_back(TTopicItem());
+    TTopicItem& item = pending_topic_updates_.back();
+    item.key = key;
+    VLOG(1) << "Publishing deletion: " << key;
+    // Don't set a value to mark this item as deleted.
+  }
+  catalog_topic_entry_keys_.swap(current_entry_keys);
 }
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 78a3f20..bf88e00 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -88,6 +88,12 @@ class CatalogServer {
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
+  /// Tracks the set of catalog objects that exist via their topic entry key.
+  /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects
+  /// that exist by their topic entry key. Used to track objects that have been removed
+  /// since the last heartbeat.
+  boost::unordered_set<std::string> catalog_topic_entry_keys_;
+
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -129,10 +135,14 @@ class CatalogServer {
   /// finds all catalog objects that have a catalog version greater than the last update
   /// sent by calling into the JniCatalog. The topic is updated with any catalog objects
   /// that are new or have been modified since the last heartbeat (by comparing the
-  /// catalog version of the object with last_sent_catalog_version_). At the end of
-  /// execution it notifies the catalog_update_gathering_thread_ to fetch the next set of
-  /// updates from the JniCatalog. All updates are added to the subscriber_topic_updates
-  /// list and sent back to the Statestore.
+  /// catalog version of the object with last_sent_catalog_version_). Also determines any
+  /// deletions of catalog objects by looking at the
+  /// difference of the last set of topic entry keys that were sent and the current set
+  /// of topic entry keys. At the end of execution it notifies the
+  /// catalog_update_gathering_thread_ to fetch the next set of updates from the
+  /// JniCatalog.
+  /// All updates are added to the subscriber_topic_updates list and sent back to the
+  /// Statestore.
   void UpdateCatalogTopicCallback(
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -143,19 +153,20 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
-  /// Builds the next topic update to send based on what items
-  /// have been added/changed/removed from the catalog since the last hearbeat. To do
-  /// this, it enumerates the given catalog objects returned looking for the objects that
-  /// have a catalog version that is > the catalog version sent with the last heartbeat.
-  /// 'topic_deletions' is true if 'catalog_objects' contain deleted catalog
-  /// objects.
-  ///
+  /// This function determines what items have been added/removed from the catalog
+  /// since the last heartbeat and builds the next topic update to send. To do this, it
+  /// enumerates the given catalog objects returned looking for the objects that have a
+  /// catalog version that is > the catalog version sent with the last heartbeat. To
+  /// determine items that have been deleted, it saves the set of topic entry keys sent
+  /// with the last update and looks at the difference between it and the current set of
+  /// topic entry keys.
   /// The key for each entry is a string composed of:
   /// "TCatalogObjectType:<unique object name>". So for table foo.bar, the key would be
   /// "TABLE:foo.bar". Encoding the object type information in the key ensures the keys
-  /// are unique. Must hold catalog_lock_ when calling this function.
-  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects,
-      bool topic_deletions);
+  /// are unique, as well as helps to determine what object type was removed in a state
+  /// store delta update (since the state store only sends key names for deleted items).
+  /// Must hold catalog_lock_ when calling this function.
+  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects);
 
   /// Example output:
   /// "databases": [

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index 7b115b0..de4f2fd 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -55,6 +55,21 @@ TCatalogObjectType::type TCatalogObjectTypeFromName(const string& name) {
   return TCatalogObjectType::UNKNOWN;
 }
 
+Status TCatalogObjectFromEntryKey(const string& key,
+    TCatalogObject* catalog_object) {
+  // Reconstruct the object based only on the key.
+  size_t pos = key.find(":");
+  if (pos == string::npos || pos >= key.size() - 1) {
+    stringstream error_msg;
+    error_msg << "Invalid topic entry key format: " << key;
+    return Status(error_msg.str());
+  }
+
+  TCatalogObjectType::type object_type = TCatalogObjectTypeFromName(key.substr(0, pos));
+  string object_name = key.substr(pos + 1);
+  return TCatalogObjectFromObjectName(object_type, object_name, catalog_object);
+}
+
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
     const string& object_name, TCatalogObject* catalog_object) {
   switch (object_type) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index e98cd38..ddc2c21 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -32,6 +32,15 @@ class Status;
 /// TCatalogObjectType::UNKNOWN if no match was found.
 TCatalogObjectType::type TCatalogObjectTypeFromName(const std::string& name);
 
+/// Parses the given IMPALA_CATALOG_TOPIC topic entry key to determine the
+/// TCatalogObjectType and unique object name. Populates catalog_object with the result.
+/// This is used to reconstruct type information when an item is deleted from the
+/// topic. At that time the only context available about the object being deleted is its
+/// its topic entry key which contains only the type and object name. The resulting
+/// TCatalogObject can then be used to removing a matching item from the catalog.
+Status TCatalogObjectFromEntryKey(const std::string& key,
+    TCatalogObject* catalog_object);
+
 /// Populates a TCatalogObject based on the given object type (TABLE, DATABASE, etc) and
 /// object name string.
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index b6dd86a..e7e05da 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -62,7 +62,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
-    {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
+    {"getCatalogObjects", "(J)[B", &get_catalog_objects_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -97,10 +97,19 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
-Status Catalog::GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp) {
-  TGetCatalogDeltaRequest request;
-  request.__set_from_version(from_version);
-  return JniUtil::CallJniMethod(catalog_, get_catalog_delta_id_, request, resp);
+Status Catalog::GetAllCatalogObjects(long from_version,
+    TGetAllCatalogObjectsResponse* resp) {
+  JNIEnv* jni_env = getJNIEnv();
+  JniLocalFrame jni_frame;
+  RETURN_IF_ERROR(jni_frame.push(jni_env));
+  jvalue requested_from_version;
+  requested_from_version.j = from_version;
+  jbyteArray result_bytes = static_cast<jbyteArray>(
+      jni_env->CallObjectMethod(catalog_, get_catalog_objects_id_,
+      requested_from_version));
+  RETURN_ERROR_IF_EXC(jni_env);
+  RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, resp));
+  return Status::OK();
 }
 
 Status Catalog::ExecDdl(const TDdlExecRequest& req, TDdlExecResponse* resp) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 3119d60..ab6a2a3 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -56,10 +56,12 @@ class Catalog {
   /// Status object with information on the error will be returned.
   Status GetCatalogVersion(long* version);
 
-  /// Retrieves the catalog objects that were added/modified/deleted since version
-  /// 'from_version'. Returns OK if the operation was successful, otherwise a Status
-  /// object with information on the error will be returned.
-  Status GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp);
+  /// Gets all Catalog objects and the metadata that is applicable for the given request.
+  /// Always returns all object names that exist in the Catalog, but allows for extended
+  /// metadata for objects that were modified after the specified version.
+  /// Returns OK if the operation was successful, otherwise a Status object with
+  /// information on the error will be returned.
+  Status GetAllCatalogObjects(long from_version, TGetAllCatalogObjectsResponse* resp);
 
   /// Gets the Thrift representation of a Catalog object. The request is a TCatalogObject
   /// which has the desired TCatalogObjectType and name properly set.
@@ -72,7 +74,7 @@ class Catalog {
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
   /// and each pN may contain wildcards denoted by '*' which match all strings.
   /// TODO: GetDbs() and GetTableNames() can probably be scrapped in favor of
-  /// GetCatalogDelta(). Consider removing them and moving everything to use
+  /// GetAllCatalogObjects(). Consider removing them and moving everything to use
   /// that.
   Status GetDbs(const std::string* pattern, TGetDbsResult* dbs);
 
@@ -107,7 +109,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
-  jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
+  jmethodID get_catalog_objects_id_;  // JniCatalog.getCatalogObjects()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 99f659a..ed4e7e2 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -645,6 +645,7 @@ void AdmissionController::UpdatePoolStats(
         }
       }
       HandleTopicUpdates(delta.topic_entries);
+      HandleTopicDeletions(delta.topic_deletions);
     }
     UpdateClusterAggregates();
   }
@@ -682,10 +683,6 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
     // The topic entry from this subscriber is handled specially; the stats coming
     // from the statestore are likely already outdated.
     if (topic_backend_id == host_id_) continue;
-    if (item.deleted) {
-      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-      continue;
-    }
     TPoolStats remote_update;
     uint32_t len = item.value.size();
     Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -698,7 +695,18 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
   }
 }
 
-void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) {
+void AdmissionController::HandleTopicDeletions(const vector<string>& topic_deletions) {
+  for (const string& topic_key: topic_deletions) {
+    string pool_name;
+    string topic_backend_id;
+    if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
+    if (topic_backend_id == host_id_) continue;
+    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+  }
+}
+
+void AdmissionController::PoolStats::UpdateAggregates(
+    HostMemMap* host_mem_reserved) {
   const string& coord_id = parent_->host_id_;
   int64_t num_running = 0;
   int64_t num_queued = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 2830bee..71c9fa4 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -428,10 +428,13 @@ class AdmissionController {
   void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
-  /// Removes remote stats identified by topic deletions coming from the
-  /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
   void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
 
+  /// Removes remote stats identified by the topic_deletions coming from the statestore.
+  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  void HandleTopicDeletions(const std::vector<std::string>& topic_deletions);
+
   /// Re-computes the per-pool aggregate stats and the per-host aggregates in
   /// host_mem_reserved_ using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 05cfc42..6fb2bba 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -478,10 +478,7 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
   TTopicDelta delta;
   delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
-  TTopicItem item;
-  item.__set_deleted(true);
-  item.__set_key(host.ip);
-  delta.topic_entries.push_back(item);
+  delta.topic_deletions.push_back(host.ip);
   SendTopicDelta(delta);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5cf0f01..2bd6c96 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -131,7 +131,9 @@ void Scheduler::UpdateMembership(
 
   // If the delta transmitted by the statestore is empty we can skip processing
   // altogether and avoid making a copy of executors_config_.
-  if (delta.is_delta && delta.topic_entries.empty()) return;
+  if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) {
+    return;
+  }
 
   // This function needs to handle both delta and non-delta updates. To minimize the
   // time needed to hold locks, all updates are applied to a copy of
@@ -148,17 +150,10 @@ void Scheduler::UpdateMembership(
     new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new and removed entries to the topic. Update executors_config_ and
+  // Process new entries to the topic. Update executors_config_ and
   // current_executors_ to match the set of executors given by the
   // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
-    if (item.deleted) {
-      if (current_executors_.find(item.key) != current_executors_.end()) {
-        new_executors_config->RemoveBackend(current_executors_[item.key]);
-        current_executors_.erase(item.key);
-      }
-      continue;
-    }
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
     // ~10m messages per second, so no immediate need to consider optimization.
@@ -193,6 +188,15 @@ void Scheduler::UpdateMembership(
       current_executors_.insert(make_pair(item.key, be_desc));
     }
   }
+
+  // Process deletions from the topic
+  for (const string& backend_id : delta.topic_deletions) {
+    if (current_executors_.find(backend_id) != current_executors_.end()) {
+      new_executors_config->RemoveBackend(current_executors_[backend_id]);
+      current_executors_.erase(backend_id);
+    }
+  }
+
   SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c065282..2222802 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1316,10 +1316,10 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
+
   // Update catalog cache in frontend. An update is split into batches of size
   // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
-  if (delta.topic_entries.size() != 0)  {
-    vector<TCatalogObject> dropped_objects;
+  if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
     vector<TUpdateCatalogCacheRequest> update_reqs;
     update_reqs.push_back(TUpdateCatalogCacheRequest());
     TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
@@ -1329,6 +1329,7 @@ void ImpalaServer::CatalogUpdateCallback(
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
     uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
+      TCatalogObject catalog_object;
       Status status;
       vector<uint8_t> data_buffer;
       const uint8_t* data_buffer_ptr = nullptr;
@@ -1346,70 +1347,82 @@ void ImpalaServer::CatalogUpdateCallback(
         data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
         len = item.value.size();
       }
-      if (len > 100 * 1024 * 1024 /* 100MB */) {
-        LOG(INFO) << "Received large catalog object(>100mb): "
-            << item.key << " is "
-            << PrettyPrinter::Print(len, TUnit::BYTES);
-      }
-      TCatalogObject catalog_object;
       status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic,
           &catalog_object);
       if (!status.ok()) {
         LOG(ERROR) << "Error deserializing item " << item.key
-            << ": " << status.GetDetail();
+                   << ": " << status.GetDetail();
         continue;
       }
+      if (len > 100 * 1024 * 1024 /* 100MB */) {
+        LOG(INFO) << "Received large catalog update(>100mb): "
+                     << item.key << " is "
+                     << PrettyPrinter::Print(len, TUnit::BYTES);
+      }
+      if (catalog_object.type == TCatalogObjectType::CATALOG) {
+        incremental_request->__set_catalog_service_id(
+            catalog_object.catalog.catalog_service_id);
+        new_catalog_version = catalog_object.catalog_version;
+      }
+
+      // Refresh the lib cache entries of any added functions and data sources
+      // TODO: if frontend returns the list of functions and data sources, we do not
+      // need to deserialize these in backend.
+      if (catalog_object.type == TCatalogObjectType::FUNCTION) {
+        DCHECK(catalog_object.__isset.fn);
+        LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
+      }
+      if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+        DCHECK(catalog_object.__isset.data_source);
+        LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
+      }
 
       if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
-        // Initialize a new batch of catalog updates.
         update_reqs.push_back(TUpdateCatalogCacheRequest());
         incremental_request = &update_reqs.back();
         batch_size_bytes = 0;
       }
+      incremental_request->updated_objects.push_back(catalog_object);
+      batch_size_bytes += len;
+    }
+    update_reqs.push_back(TUpdateCatalogCacheRequest());
+    TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back();
 
-      if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        incremental_request->__set_catalog_service_id(
-            catalog_object.catalog.catalog_service_id);
-        new_catalog_version = catalog_object.catalog_version;
+    // We need to look up the dropped functions and data sources and remove them
+    // from the library cache. The data sent from the catalog service does not
+    // contain all the function metadata so we'll ask our local frontend for it. We
+    // need to do this before updating the catalog.
+    vector<TCatalogObject> dropped_objects;
+
+    // Process all Catalog deletions (dropped objects). We only know the keys (object
+    // names) so must parse each key to determine the TCatalogObject.
+    for (const string& key: delta.topic_deletions) {
+      LOG(INFO) << "Catalog topic entry deletion: " << key;
+      TCatalogObject catalog_object;
+      Status status = TCatalogObjectFromEntryKey(key, &catalog_object);
+      if (!status.ok()) {
+        LOG(ERROR) << "Error parsing catalog topic entry deletion key: " << key << " "
+                   << "Error: " << status.GetDetail();
+        continue;
       }
-      VLOG(1) << (item.deleted ? "Deleted " : "Added ") << "item: " << item.key
-          << " version: " << catalog_object.catalog_version << " of size: " << len;
-
-      if (!item.deleted) {
-        // Refresh the lib cache entries of any added functions and data sources
-        // TODO: if frontend returns the list of functions and data sources, we do not
-        // need to deserialize these in backend.
-        if (catalog_object.type == TCatalogObjectType::FUNCTION) {
-          DCHECK(catalog_object.__isset.fn);
-          LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
-        }
-        if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-          DCHECK(catalog_object.__isset.data_source);
-          LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
-        }
-        incremental_request->updated_objects.push_back(catalog_object);
-      } else {
-        // We need to look up any dropped functions and data sources and remove
-        // them from the library cache.
-        if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-            catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-          TCatalogObject existing_object;
-          if (exec_env_->frontend()->GetCatalogObject(
-              catalog_object, &existing_object).ok()) {
-            // If the object exists in the catalog it may have been dropped and
-            // re-created. To avoid removing the re-created object's entry from
-            // the cache verify that the existing object's version <= the
-            // version of the dropped object included in this statestore
-            // heartbeat.
-            DCHECK_NE(existing_object.catalog_version, catalog_object.catalog_version);
-            if (existing_object.catalog_version < catalog_object.catalog_version) {
-              dropped_objects.push_back(existing_object);
+      deletion_request->removed_objects.push_back(catalog_object);
+      if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+          catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+        TCatalogObject dropped_object;
+        if (exec_env_->frontend()->GetCatalogObject(
+                catalog_object, &dropped_object).ok()) {
+          // This object may have been dropped and re-created. To avoid removing the
+          // re-created object's entry from the cache verify the existing object has a
+          // catalog version <= the catalog version included in this statestore heartbeat.
+          if (dropped_object.catalog_version <= new_catalog_version) {
+            if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+                catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+              dropped_objects.push_back(dropped_object);
             }
           }
         }
-        incremental_request->removed_objects.push_back(catalog_object);
+        // Nothing to do in error case.
       }
-      batch_size_bytes += len;
     }
 
     // Call the FE to apply the changes to the Impalad Catalog.
@@ -1530,10 +1543,6 @@ void ImpalaServer::MembershipCallback(
 
     // Process membership additions.
     for (const TTopicItem& item: delta.topic_entries) {
-      if (item.deleted) {
-        known_backends_.erase(item.key);
-        continue;
-      }
       uint32_t len = item.value.size();
       TBackendDescriptor backend_descriptor;
       Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -1550,12 +1559,18 @@ void ImpalaServer::MembershipCallback(
     // Only register if all ports have been opened and are ready.
     if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates);
 
+    // Process membership deletions.
+    for (const string& backend_id: delta.topic_deletions) {
+      known_backends_.erase(backend_id);
+    }
+
     // Create a set of known backend network addresses. Used to test for cluster
     // membership by network address.
     set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes is true.
     TUpdateMembershipRequest update_req;
-    bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
+    bool any_changes = !delta.topic_entries.empty() || !delta.topic_deletions.empty() ||
+        !delta.is_delta;
     for (const BackendDescriptorMap::value_type& backend: known_backends_) {
       current_membership.insert(backend.second.address);
       if (any_changes) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index d0a4851..2d93c5f 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -85,6 +85,8 @@ const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-by
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
+const Statestore::TopicEntry::Value Statestore::TopicEntry::NULL_VALUE = "";
+
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
@@ -122,13 +124,13 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
 void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
     TopicEntry::Version version) {
-  DCHECK_GT(bytes.size(), 0);
+  DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
   value_ = bytes;
   version_ = version;
 }
 
 Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
+    const Statestore::TopicEntry::Value& bytes) {
   TopicEntryMap::iterator entry_it = entries_.find(key);
   int64_t key_size_delta = 0;
   int64_t value_size_delta = 0;
@@ -145,7 +147,6 @@ Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
   value_size_delta += bytes.size();
 
   entry_it->second.SetValue(bytes, ++last_version_);
-  entry_it->second.SetDeleted(is_deleted);
   topic_update_log_.insert(make_pair(entry_it->second.version(), key));
 
   total_key_size_bytes_ += key_size_delta;
@@ -167,10 +168,12 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     // entry
     topic_update_log_.erase(version);
     topic_update_log_.insert(make_pair(++last_version_, key));
+    total_value_size_bytes_ -= entry_it->second.value().size();
+    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
+
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
-    entry_it->second.SetDeleted(true);
-    entry_it->second.SetVersion(last_version_);
+    entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
   }
 }
 
@@ -464,9 +467,11 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // At this point the updates are assumed to have been successfully processed by the
   // subscriber. Update the subscriber's max version of each topic.
-  for (const auto& topic_delta: update_state_request.topic_deltas) {
-    subscriber->SetLastTopicVersionProcessed(topic_delta.first,
-        topic_delta.second.to_version);
+  map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
+      update_state_request.topic_deltas.begin();
+  for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
+    subscriber->SetLastTopicVersionProcessed(topic_delta->first,
+        topic_delta->second.to_version);
   }
 
   // Thirdly: perform any / all updates returned by the subscriber
@@ -495,8 +500,14 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
       Topic* topic = &topic_it->second;
       for (const TTopicItem& item: update.topic_entries) {
-        subscriber->AddTransientUpdate(update.topic_name, item.key,
-            topic->Put(item.key, item.value, item.deleted));
+        TopicEntry::Version version = topic->Put(item.key, item.value);
+        subscriber->AddTransientUpdate(update.topic_name, item.key, version);
+      }
+
+      for (const string& key: update.topic_deletions) {
+        TopicEntry::Version version =
+            topic->Put(key, Statestore::TopicEntry::NULL_VALUE);
+        subscriber->AddTransientUpdate(update.topic_name, key, version);
       }
     }
   }
@@ -530,25 +541,30 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
       TopicUpdateLog::const_iterator next_update =
           topic.topic_update_log().upper_bound(last_processed_version);
 
-      uint64_t topic_size = 0;
+      int64_t deleted_key_size_bytes = 0;
       for (; next_update != topic.topic_update_log().end(); ++next_update) {
         TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
         DCHECK(itr != topic.entries().end());
         const TopicEntry& topic_entry = itr->second;
-        // Don't send deleted entries for non-delta updates.
-        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
-          continue;
+        if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
+          if (!topic_delta.is_delta) {
+            deleted_key_size_bytes += itr->first.size();
+            continue;
+          }
+          topic_delta.topic_deletions.push_back(itr->first);
+        } else {
+          topic_delta.topic_entries.push_back(TTopicItem());
+          TTopicItem& topic_item = topic_delta.topic_entries.back();
+          topic_item.key = itr->first;
+          // TODO: Does this do a needless copy?
+          topic_item.value = topic_entry.value();
         }
-        topic_delta.topic_entries.push_back(TTopicItem());
-        TTopicItem& topic_item = topic_delta.topic_entries.back();
-        topic_item.key = itr->first;
-        topic_item.value = topic_entry.value();
-        topic_item.deleted = topic_entry.is_deleted();
-        topic_size += topic_item.key.size() + topic_item.value.size();
       }
 
       if (!topic_delta.is_delta &&
           topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
+        int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes
+            + topic.total_value_size_bytes();
         VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
                    << " topic update for " << subscriber.id() << ". Size = "
                    << PrettyPrinter::Print(topic_size, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 38b8361..1488f7e 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -131,7 +131,8 @@ class Statestore : public CacheLineAligned {
 
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
-  /// pair.
+  /// pair. If the byte string is NULL, the entry has been deleted, but may be retained to
+  /// track changes to send to subscribers.
   class TopicEntry {
    public:
     /// A Value is a string of bytes, for which std::string is a convenient representation.
@@ -145,38 +146,30 @@ class Statestore : public CacheLineAligned {
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
 
-    /// Sets the value of this entry to the byte / length pair. The caller is responsible
-    /// for ensuring, if required, that the version parameter is larger than the
-    /// current version() TODO: Consider enforcing version monotonicity here.
-    void SetValue(const Value& bytes, Version version);
-
-    /// Sets a new version for this entry.
-    void SetVersion(Version version) { version_ = version; }
+    /// Representation of an empty Value. Must have size() == 0.
+    static const Value NULL_VALUE;
 
-    /// Sets the is_deleted_ flag for this entry.
-    void SetDeleted(bool is_deleted) { is_deleted_ = is_deleted; }
+    /// Sets the value of this entry to the byte / length pair. NULL_VALUE implies this
+    /// entry has been deleted.  The caller is responsible for ensuring, if required, that
+    /// the version parameter is larger than the current version() TODO: Consider enforcing
+    /// version monotonicity here.
+    void SetValue(const Value& bytes, Version version);
 
-    TopicEntry() : version_(TOPIC_ENTRY_INITIAL_VERSION),
-        is_deleted_(false) { }
+    TopicEntry() : value_(NULL_VALUE), version_(TOPIC_ENTRY_INITIAL_VERSION) { }
 
     const Value& value() const { return value_; }
     uint64_t version() const { return version_; }
     uint32_t length() const { return value_.size(); }
-    bool is_deleted() const { return is_deleted_; }
 
    private:
-    /// Byte string value, owned by this TopicEntry. The value is opaque to the
-    /// statestore, and is interpreted only by subscribers.
+    /// Byte string value, owned by this TopicEntry. The value is opaque to the statestore,
+    /// and is interpreted only by subscribers.
     Value value_;
 
     /// The version of this entry. Every update is assigned a monotonically increasing
     /// version number so that only the minimal set of changes can be sent from the
     /// statestore to a subscriber.
     Version version_;
-
-    /// Indicates if the entry has been deleted. If true, the entry will still be
-    /// retained to track changes to send to subscribers.
-    bool is_deleted_;
   };
 
   /// Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
@@ -199,21 +192,19 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key and value (bytes). If is_deleted is
-    /// true the entry is considered deleted, and may be garbage collected in the future.
-    /// The entry is assigned a new version number by the Topic, and that version number
-    /// is returned.
+    /// Adds an entry with the given key. If bytes == NULL_VALUE, the entry is considered
+    /// deleted, and may be garbage collected in the future. The entry is assigned a new
+    /// version number by the Topic, and that version number is returned.
     //
     /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
-        bool is_deleted);
+    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes);
 
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
     /// when that subscriber fails (the same entry may exist, but may have been updated by
     /// another subscriber giving it a new version number)
     //
-    /// Deletion means marking the entry as deleted and incrementing its version
+    /// Deletion means setting the entry's value to NULL and incrementing its version
     /// number.
     //
     /// Must be called holding the topic lock

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/common/thrift/CatalogInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogInternalService.thrift b/common/thrift/CatalogInternalService.thrift
index 5170298..5b68408 100644
--- a/common/thrift/CatalogInternalService.thrift
+++ b/common/thrift/CatalogInternalService.thrift
@@ -22,25 +22,15 @@ include "CatalogObjects.thrift"
 
 // Contains structures used internally by the Catalog Server.
 
-// Arguments to a GetCatalogDelta call.
-struct TGetCatalogDeltaRequest {
-  // The base catalog version from which the delta is computed.
-  1: required i64 from_version
-}
-
-// Response from a call to GetCatalogDelta. Contains a delta of catalog objects
-// (databases, tables/views, and functions) from the CatalogService's cache relative (>)
-// to the catalog version specified in TGetCatalogDelta.from_version.
-struct TGetCatalogDeltaResponse {
+// Response from a call to GetAllCatalogObjects. Contains all known Catalog objects
+// (databases, tables/views, and functions) from the CatalogService's cache.
+// What metadata is included for each object is based on the parameters used in
+// the request.
+struct TGetAllCatalogObjectsResponse {
   // The maximum catalog version of all objects in this response or 0 if the Catalog
   // contained no objects.
   1: required i64 max_catalog_version
 
-  // List of updated (new and modified) catalog objects for which the catalog verion is
-  // larger than TGetCatalotDeltaRequest.from_version.
-  2: required list<CatalogObjects.TCatalogObject> updated_objects
-
-  // List of deleted catalog objects for which the catalog version is larger than
-  // TGetCatalogDelta.from_version.
-  3: required list<CatalogObjects.TCatalogObject> deleted_objects
+  // List of catalog objects (empty list if no objects detected in the Catalog).
+  2: required list<CatalogObjects.TCatalogObject> objects
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index f04650e..60a0d0d 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -79,15 +79,8 @@ struct TTopicItem {
   1: required string key;
 
   // Byte-string value for this topic entry. May not be null-terminated (in that it may
-  // contain null bytes). It can be non-empty when deleted is true. This is needed when
-  // subscribers need additional information in order to process the deleted topics that
-  // is not included in the topic key (e.g. catalog version of deleted catalog objects).
+  // contain null bytes)
   2: required string value;
-
-  // If true, this item was deleted. When false, this TTopicItem need not be included in
-  // non-delta TTopicDelta's (since the latest version of every still-present topic will
-  // be included).
-  3: required bool deleted = false;
 }
 
 // Set of changes to a single topic, sent from the statestore to a subscriber as well as
@@ -96,14 +89,15 @@ struct TTopicDelta {
   // Name of the topic this delta applies to
   1: required string topic_name;
 
-  // When is_delta=true, a list of changes to topic entries, including deletions, within
-  // [from_version, to_version].
-  // When is_delta=false, this is the list of all non-delete topic entries for
-  // [0, to_version], which can be used to reconstruct the topic from scratch.
+  // List of changes to topic entries
   2: required list<TTopicItem> topic_entries;
 
-  // True if entries / deletions are relative to the topic at versions [0, from_version].
-  3: required bool is_delta;
+  // List of topic item keys whose entries have been deleted
+  3: required list<string> topic_deletions;
+
+  // True if entries / deletions are to be applied to in-memory state,
+  // otherwise topic_entries contains entire topic state.
+  4: required bool is_delta;
 
   // The Topic version range this delta covers. If there have been changes to the topic,
   // the update will include all changes in the range: [from_version, to_version).
@@ -111,17 +105,16 @@ struct TTopicDelta {
   // to_version. The from_version will always be 0 for non-delta updates.
   // If this is an update being sent from a subscriber to the statestore, the from_version
   // is set only when recovering from an inconsistent state, to the last version of the
-  // topic the subscriber successfully processed. The value of to_version doesn't depend
-  // on whether the update is delta or not.
-  4: optional i64 from_version
-  5: optional i64 to_version
+  // topic the subscriber successfully processed.
+  5: optional i64 from_version
+  6: optional i64 to_version
 
   // The minimum topic version of all subscribers to the topic. This can be used to
   // determine when all subscribers have successfully processed a specific update.
   // This is guaranteed because no subscriber will ever be sent a topic containing
   // keys with a version < min_subscriber_topic_version. Only used when sending an update
   // from the statestore to a subscriber.
-  6: optional i64 min_subscriber_topic_version
+  7: optional i64 min_subscriber_topic_version
 }
 
 // Description of a topic to subscribe to as part of a RegisterSubscriber call

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index c00c460..27839b3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.catalog;
 
-import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -25,37 +24,24 @@ import java.util.TreeMap;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TTable;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 /**
- * Represents a log of deleted catalog objects.
+ * The impalad catalog cache can be modified by either a state store update or by a
+ * direct ("fast") update that applies the result of a catalog operation to the cache
+ * out-of-band of a state store update. This thread safe log tracks the divergence
+ * (due to direct updates to the cache) of this impalad's cache from the last state
+ * store update. This log is needed to ensure work is never undone. For example,
+ * consider the following sequence of events:
+ * t1: [Direct Update] - Add item A - (Catalog Version 9)
+ * t2: [Direct Update] - Drop item A - (Catalog Version 10)
+ * t3: [StateStore Update] - (From Catalog Version 9)
+ * This log is used to ensure the state store update in t3 does not undo the drop in t2.
  *
- * There are currently two use cases for this log:
- *
- * a) Processing catalog updates in the impalads
- *   The impalad catalog cache can be modified by either a state store update or by a
- *   direct ("fast") update that applies the result of a catalog operation to the cache
- *   out-of-band of a state store update. This thread safe log tracks the divergence
- *   (due to direct updates to the cache) of this impalad's cache from the last state
- *   store update. This log is needed to ensure work is never undone. For example,
- *   consider the following sequence of events:
- *   t1: [Direct Update] - Add item A - (Catalog Version 9)
- *   t2: [Direct Update] - Drop item A - (Catalog Version 10)
- *   t3: [StateStore Update] - (From Catalog Version 9)
- *   This log is used to ensure the state store update in t3 does not undo the drop in t2.
- *   Currently this only tracks objects that were dropped, since the catalog cache can be
- *   queried to check if an object was added. TODO: Also track object additions from async
- *   operations. This could be used to to "replay" the log in the case of a catalog reset
- *   ("invalidate metadata"). Currently, the catalog may briefly go back in time if
- *   "invalidate metadata" is run concurrently with async catalog operations.
- *
- * b) Building catalog topic updates in the catalogd
- *   The catalogd uses this log to identify deleted catalog objects. Deleted
- *   catalog objects are added to this log by the corresponding operations that delete
- *   them (e.g. dropTable()). While constructing a catalog update topic, we use the log to
- *   determine which catalog objects were deleted since the last catalog topic update.
- *   Once the catalog topic update is constructed, the old deleted catalog objects are
- *   garbage collected to prevent the log from growing indefinitely.
+ * Currently this only tracks objects that were dropped, since the catalog cache can be
+ * queried to check if an object was added. TODO: Also track object additions from async
+ * operations. This could be used to to "replay" the log in the case of a catalog reset
+ * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
+ * "invalidate metadata" is run concurrently with async catalog operations.
  */
 public class CatalogDeltaLog {
   // Map of the catalog version an object was removed from the catalog
@@ -72,15 +58,6 @@ public class CatalogDeltaLog {
   }
 
   /**
-   * Retrieve all the removed catalog objects with version > 'fromVersion'.
-   */
-  public synchronized List<TCatalogObject> retrieveObjects(long fromVersion) {
-    SortedMap<Long, TCatalogObject> objects =
-        removedCatalogObjects_.tailMap(fromVersion + 1);
-    return ImmutableList.<TCatalogObject>copyOf(objects.values());
-  }
-
-  /**
    * Given the current catalog version, removes all items with catalogVersion <
    * currectCatalogVersion. Such objects do not need to be tracked in the delta
    * log anymore because they are consistent with the state store's view of the
@@ -114,45 +91,30 @@ public class CatalogDeltaLog {
   }
 
   /**
-   * Returns true if the two objects have the same object type and key (generated using
-   * toCatalogObjectKey()).
-   * TODO: Use global object IDs everywhere instead of tracking catalog objects by
-   * generated keys.
-   */
-  private static boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
-    return toCatalogObjectKey(first).equals(toCatalogObjectKey(second));
-  }
-
-  /**
-   * Returns a unique string key of a catalog object.
+   * Returns true if the two objects have the same object type and name.
+   * TODO: Use global object IDs everywhere instead of tracking catalog objects by name.
    */
-  public static String toCatalogObjectKey(TCatalogObject catalogObject)
-      throws IllegalStateException {
-    switch (catalogObject.getType()) {
+  private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
+    if (first.getType() != second.getType()) return false;
+    switch (first.getType()) {
       case DATABASE:
-        return "DATABASE:" + catalogObject.getDb().getDb_name().toLowerCase();
+        return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name());
       case TABLE:
       case VIEW:
-        TTable tbl = catalogObject.getTable();
-        return "TABLE:" + tbl.getDb_name().toLowerCase() + "." +
-            tbl.getTbl_name().toLowerCase();
+        TTable firstTbl = first.getTable();
+        return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) &&
+            firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name());
       case FUNCTION:
-        return "FUNCTION:" + catalogObject.getFn().getName() + "(" +
-            catalogObject.getFn().getSignature() + ")";
+        return first.getFn().getSignature().equals(second.getFn().getSignature()) &&
+            first.getFn().getName().equals(second.getFn().getName());
       case ROLE:
-        return "ROLE:" + catalogObject.getRole().getRole_name().toLowerCase();
+        return first.getRole().getRole_name().equalsIgnoreCase(
+            second.getRole().getRole_name());
       case PRIVILEGE:
-        return "PRIVILEGE:" +
-            catalogObject.getPrivilege().getPrivilege_name().toLowerCase() + "." +
-            Integer.toString(catalogObject.getPrivilege().getRole_id());
-      case HDFS_CACHE_POOL:
-        return "HDFS_CACHE_POOL:" +
-            catalogObject.getCache_pool().getPool_name().toLowerCase();
-      case DATA_SOURCE:
-        return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
-      default:
-        throw new IllegalStateException(
-            "Unsupported catalog object type: " + catalogObject.getType());
+        return first.getPrivilege().getPrivilege_name().equalsIgnoreCase(
+            second.getPrivilege().getPrivilege_name()) &&
+            first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id();
+      default: return false;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index a4f8608..d2a0a82 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -58,7 +59,7 @@ import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
@@ -149,9 +150,6 @@ public class CatalogServiceCatalog extends Catalog {
   // Local temporary directory to copy UDF Jars.
   private static String localLibraryPath_;
 
-  // Log of deleted catalog objects.
-  private final CatalogDeltaLog deleteLog_;
-
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -182,7 +180,6 @@ public class CatalogServiceCatalog extends Catalog {
       sentryProxy_ = null;
     }
     localLibraryPath_ = new String("file://" + localLibraryPath);
-    deleteLog_ = new CatalogDeltaLog();
   }
 
   // Timeout for acquiring a table lock
@@ -269,15 +266,8 @@ public class CatalogServiceCatalog extends Catalog {
         }
         // Remove dropped cache pools.
         for (String cachePoolName: droppedCachePoolNames) {
-          HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
-          if (cachePool != null) {
-            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
-            TCatalogObject removedObject =
-                new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-                    cachePool.getCatalogVersion());
-            removedObject.setCache_pool(cachePool.toThrift());
-            deleteLog_.addRemovedObject(removedObject);
-          }
+          hdfsCachePools_.remove(cachePoolName);
+          CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
         }
       } finally {
         catalogLock_.writeLock().unlock();
@@ -307,140 +297,117 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Computes and returns a delta of catalog objects relative to 'fromVersion'. Takes a
-   * lock on the catalog to ensure this update contains a consistent snapshot of the
-   * catalog.
+   * Returns all known objects in the Catalog (Tables, Views, Databases, and
+   * Functions). Some metadata may be skipped for objects that have a catalog
+   * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
+   * update contains a consistent snapshot of all items in the catalog. While holding the
+   * catalog lock, it locks each accessed table to protect against concurrent
+   * modifications.
    */
-  public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
+  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
+    TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
+    resp.setObjects(new ArrayList<TCatalogObject>());
+    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
     catalogLock_.readLock().lock();
     try {
-      TGetCatalogDeltaResponse resp = getCatalogObjects(fromVersion);
-      // Each update should contain a single "TCatalog" object which is used to
-      // pass overall state on the catalog, such as the current version and the
-      // catalog service id.
-      TCatalogObject catalog = new TCatalogObject();
-      catalog.setType(TCatalogObjectType.CATALOG);
-      // By setting the catalog version to the latest catalog version at this point,
-      // it ensure impalads will always bump their versions, even in the case where
-      // an object has been dropped.
-      long currentCatalogVersion = getCatalogVersion();
-      catalog.setCatalog_version(currentCatalogVersion);
-      catalog.setCatalog(new TCatalog(catalogServiceId_));
-      resp.addToUpdated_objects(catalog);
-
-      // The max version is the max catalog version of all items in the update.
-      resp.setMax_catalog_version(currentCatalogVersion);
-      deleteLog_.garbageCollect(currentCatalogVersion);
-      return resp;
-    } finally {
-      catalogLock_.readLock().unlock();
-    }
-  }
-
-  /**
-   * Identify and return the catalog objects that were added/modified/deleted in the
-   * catalog with versions > 'fromVersion'. The caller of this function must hold the
-   * catalog read lock to prevent concurrent modifications of the catalog.
-   */
-  private TGetCatalogDeltaResponse getCatalogObjects(long fromVersion) {
-    TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
-    resp.setUpdated_objects(new ArrayList<TCatalogObject>());
-    resp.setDeleted_objects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
-
-    // process databases
-    for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
-      if (db.getCatalogVersion() > fromVersion) {
+      for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
         TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
             db.getCatalogVersion());
         catalogDb.setDb(db.toThrift());
-        resp.addToUpdated_objects(catalogDb);
-      }
-      // process tables
-      for (Table tbl: db.getTables()) {
-        TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
-            Catalog.INITIAL_CATALOG_VERSION);
-        // Protect the table from concurrent modifications.
-        tbl.getLock().lock();
-        try {
-          // Only add the extended metadata if this table's version is > fromVersion.
-          if (tbl.getCatalogVersion() > fromVersion) {
-            try {
-              catalogTbl.setTable(tbl.toThrift());
-            } catch (Exception e) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(String.format("Error calling toThrift() on table %s: %s",
-                    tbl.getFullName(), e.getMessage()), e);
+        resp.addToObjects(catalogDb);
+
+        for (String tblName: db.getAllTableNames()) {
+          TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
+              Catalog.INITIAL_CATALOG_VERSION);
+
+          Table tbl = db.getTable(tblName);
+          if (tbl == null) {
+            LOG.error("Table: " + tblName + " was expected to be in the catalog " +
+                "cache. Skipping table for this update.");
+            continue;
+          }
+
+          // Protect the table from concurrent modifications.
+          tbl.getLock().lock();
+          try {
+            // Only add the extended metadata if this table's version is >=
+            // the fromVersion.
+            if (tbl.getCatalogVersion() >= fromVersion) {
+              try {
+                catalogTbl.setTable(tbl.toThrift());
+              } catch (Exception e) {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
+                      db.getName(), tblName, e.getMessage()), e);
+                }
+                continue;
               }
-              continue;
+              catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+            } else {
+              catalogTbl.setTable(new TTable(db.getName(), tblName));
             }
-            catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-            resp.addToUpdated_objects(catalogTbl);
+          } finally {
+            tbl.getLock().unlock();
           }
-        } finally {
-          tbl.getLock().unlock();
+          resp.addToObjects(catalogTbl);
+        }
+
+        for (Function fn: db.getFunctions(null, new PatternMatcher())) {
+          TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
+              fn.getCatalogVersion());
+          function.setFn(fn.toThrift());
+          resp.addToObjects(function);
         }
       }
-      // process functions
-      for (Function fn: db.getFunctions(null, new PatternMatcher())) {
-        if (fn.getCatalogVersion() <= fromVersion) continue;
-        TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
-            fn.getCatalogVersion());
-        function.setFn(fn.toThrift());
-        resp.addToUpdated_objects(function);
+
+      for (DataSource dataSource: getDataSources()) {
+        TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
+            dataSource.getCatalogVersion());
+        catalogObj.setData_source(dataSource.toThrift());
+        resp.addToObjects(catalogObj);
       }
-    }
-    // process data sources
-    for (DataSource dataSource: getDataSources()) {
-      if (dataSource.getCatalogVersion() <= fromVersion) continue;
-      TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
-          dataSource.getCatalogVersion());
-      catalogObj.setData_source(dataSource.toThrift());
-      resp.addToUpdated_objects(catalogObj);
-    }
-    // process cache pools
-    for (HdfsCachePool cachePool: hdfsCachePools_) {
-      if (cachePool.getCatalogVersion() <= fromVersion) continue;
-      TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-          cachePool.getCatalogVersion());
-      pool.setCache_pool(cachePool.toThrift());
-      resp.addToUpdated_objects(pool);
-    }
-    // process roles and privileges
-    for (Role role: authPolicy_.getAllRoles()) {
-      if (role.getCatalogVersion() > fromVersion) {
+      for (HdfsCachePool cachePool: hdfsCachePools_) {
+        TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+            cachePool.getCatalogVersion());
+        pool.setCache_pool(cachePool.toThrift());
+        resp.addToObjects(pool);
+      }
+
+      // Get all roles
+      for (Role role: authPolicy_.getAllRoles()) {
         TCatalogObject thriftRole = new TCatalogObject();
         thriftRole.setRole(role.toThrift());
         thriftRole.setCatalog_version(role.getCatalogVersion());
         thriftRole.setType(role.getCatalogObjectType());
-        resp.addToUpdated_objects(thriftRole);
-      }
-
-      for (RolePrivilege p: role.getPrivileges()) {
-        if (p.getCatalogVersion() <= fromVersion) continue;
-        TCatalogObject privilege = new TCatalogObject();
-        privilege.setPrivilege(p.toThrift());
-        privilege.setCatalog_version(p.getCatalogVersion());
-        privilege.setType(p.getCatalogObjectType());
-        resp.addToUpdated_objects(privilege);
+        resp.addToObjects(thriftRole);
+
+        for (RolePrivilege p: role.getPrivileges()) {
+          TCatalogObject privilege = new TCatalogObject();
+          privilege.setPrivilege(p.toThrift());
+          privilege.setCatalog_version(p.getCatalogVersion());
+          privilege.setType(p.getCatalogObjectType());
+          resp.addToObjects(privilege);
+        }
       }
-    }
 
-    Set<String> updatedCatalogObjects = Sets.newHashSet();
-    for (TCatalogObject catalogObj: resp.updated_objects) {
-      updatedCatalogObjects.add(CatalogDeltaLog.toCatalogObjectKey(catalogObj));
-    }
+      // Each update should contain a single "TCatalog" object which is used to
+      // pass overall state on the catalog, such as the current version and the
+      // catalog service id.
+      TCatalogObject catalog = new TCatalogObject();
+      catalog.setType(TCatalogObjectType.CATALOG);
+      // By setting the catalog version to the latest catalog version at this point,
+      // it ensure impalads will always bump their versions, even in the case where
+      // an object has been dropped.
+      catalog.setCatalog_version(getCatalogVersion());
+      catalog.setCatalog(new TCatalog(catalogServiceId_));
+      resp.addToObjects(catalog);
 
-    // Identify the catalog objects that were removed from the catalog for which the
-    // version is > 'fromVersion'. We need to make sure that we don't include "deleted"
-    // objects that were re-added to the catalog.
-    for (TCatalogObject removedObject: deleteLog_.retrieveObjects(fromVersion)) {
-      if (!updatedCatalogObjects.contains(CatalogDeltaLog.toCatalogObjectKey(
-          removedObject))) {
-        resp.addToDeleted_objects(removedObject);
-      }
+      // The max version is the max catalog version of all items in the update.
+      resp.setMax_catalog_version(getCatalogVersion());
+      return resp;
+    } finally {
+      catalogLock_.readLock().unlock();
     }
-    return resp;
   }
 
   /**
@@ -743,40 +710,6 @@ public class CatalogServiceCatalog extends Catalog {
           tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
         }
       }
-
-      if (existingDb != null) {
-        // Identify any removed functions and add them to the delta log.
-        for (Map.Entry<String, List<Function>> e:
-             existingDb.getAllFunctions().entrySet()) {
-          for (Function fn: e.getValue()) {
-            if (newDb.getFunction(fn,
-                Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
-              fn.setCatalogVersion(incrementAndGetCatalogVersion());
-              TCatalogObject removedObject =
-                  new TCatalogObject(TCatalogObjectType.FUNCTION, fn.getCatalogVersion());
-              removedObject.setFn(fn.toThrift());
-              deleteLog_.addRemovedObject(removedObject);
-            }
-          }
-        }
-
-        // Identify any deleted tables and add them to the delta log
-        Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
-        Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
-        oldTableNames.removeAll(newTableNames);
-        for (String removedTableName: oldTableNames) {
-          Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
-              removedTableName);
-          removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
-          TCatalogObject removedObject =
-              new TCatalogObject(TCatalogObjectType.TABLE,
-                  removedTable.getCatalogVersion());
-          removedObject.setTable(new TTable());
-          removedObject.getTable().setDb_name(existingDb.getName());
-          removedObject.getTable().setTbl_name(removedTableName);
-          deleteLog_.addRemovedObject(removedObject);
-        }
-      }
       return Pair.create(newDb, tblsToBackgroundLoad);
     } catch (Exception e) {
       LOG.warn("Encountered an exception while invalidating database: " + dbName +
@@ -824,22 +757,6 @@ public class CatalogServiceCatalog extends Catalog {
         }
       }
       dbCache_.set(newDbCache);
-
-      // Identify any deleted databases and add them to the delta log.
-      Set<String> oldDbNames = oldDbCache.keySet();
-      Set<String> newDbNames = newDbCache.keySet();
-      oldDbNames.removeAll(newDbNames);
-      for (String dbName: oldDbNames) {
-        Db removedDb = oldDbCache.get(dbName);
-        Preconditions.checkNotNull(removedDb);
-        removedDb.setCatalogVersion(
-            CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
-        TCatalogObject removedObject = new TCatalogObject(TCatalogObjectType.DATABASE,
-            removedDb.getCatalogVersion());
-        removedObject.setDb(removedDb.toThrift());
-        deleteLog_.addRemovedObject(removedObject);
-      }
-
       // Submit tables for background loading.
       for (TTableName tblName: tblsToBackgroundLoad) {
         tableLoadingMgr_.backgroundLoad(tblName);
@@ -1442,6 +1359,4 @@ public class CatalogServiceCatalog extends Catalog {
       tbl.getLock().unlock();
     }
   }
-
-  public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 70c9a61..4c959b2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -177,7 +177,7 @@ public class ImpaladCatalog extends Catalog {
     // its child tables/functions is fine. If that happens, the removal of the child
     // object will be a no-op.
     for (TCatalogObject catalogObject: req.getRemoved_objects()) {
-      removeCatalogObject(catalogObject);
+      removeCatalogObject(catalogObject, newCatalogVersion);
     }
     lastSyncedCatalogVersion_ = newCatalogVersion;
     // Cleanup old entries in the log.
@@ -319,10 +319,24 @@ public class ImpaladCatalog extends Catalog {
   /**
    *  Removes the matching TCatalogObject from the catalog, if one exists and its
    *  catalog version is < the catalog version of this drop operation.
+   *  Note that drop operations that come from statestore heartbeats always have a
+   *  version of 0. To determine the drop version for statestore updates,
+   *  the catalog version from the current update is used. This is okay because there
+   *  can never be a catalog update from the statestore that contains a drop
+   *  and an addition of the same object. For more details on how drop
+   *  versioning works, see CatalogServerCatalog.java
    */
-  private void removeCatalogObject(TCatalogObject catalogObject) {
-    Preconditions.checkState(catalogObject.getCatalog_version() != 0);
-    long dropCatalogVersion = catalogObject.getCatalog_version();
+  private void removeCatalogObject(TCatalogObject catalogObject,
+      long currentCatalogUpdateVersion) {
+    // The TCatalogObject associated with a drop operation from a state store
+    // heartbeat will always have a version of zero. Because no update from
+    // the state store can contain both a drop and an addition of the same object,
+    // we can assume the drop version is the current catalog version of this update.
+    // If the TCatalogObject contains a version that != 0, it indicates the drop
+    // came from a direct update.
+    long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
+        currentCatalogUpdateVersion : catalogObject.getCatalog_version();
+
     switch(catalogObject.getType()) {
       case DATABASE:
         removeDb(catalogObject.getDb(), dropCatalogVersion);

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1683cc0..dcec430 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1091,7 +1091,6 @@ public class CatalogOpExecutor {
     removedObject.setCatalog_version(dataSource.getCatalogVersion());
     resp.result.addToRemoved_catalog_objects(removedObject);
     resp.result.setVersion(dataSource.getCatalogVersion());
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
   }
 
   /**
@@ -1274,7 +1273,6 @@ public class CatalogOpExecutor {
     removedObject.getDb().setDb_name(params.getDb());
     resp.result.setVersion(removedObject.getCatalog_version());
     resp.result.addToRemoved_catalog_objects(removedObject);
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
   }
 
   /**
@@ -1396,7 +1394,6 @@ public class CatalogOpExecutor {
     removedObject.getTable().setTbl_name(tableName.getTbl());
     removedObject.getTable().setDb_name(tableName.getDb());
     removedObject.setCatalog_version(resp.result.getVersion());
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
     resp.result.addToRemoved_catalog_objects(removedObject);
   }
 
@@ -1524,9 +1521,6 @@ public class CatalogOpExecutor {
 
       if (!removedFunctions.isEmpty()) {
         resp.result.setRemoved_catalog_objects(removedFunctions);
-        for (TCatalogObject removedFnObject: removedFunctions) {
-          catalog_.getDeleteLog().addRemovedObject(removedFnObject);
-        }
       }
       resp.result.setVersion(catalog_.getCatalogVersion());
     }
@@ -2223,7 +2217,6 @@ public class CatalogOpExecutor {
     removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
     removedObject.setCatalog_version(addedObject.getCatalog_version());
     response.result.addToRemoved_catalog_objects(removedObject);
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
     response.result.addToUpdated_catalog_objects(addedObject);
     response.result.setVersion(addedObject.getCatalog_version());
   }
@@ -2816,7 +2809,6 @@ public class CatalogOpExecutor {
     catalogObject.setCatalog_version(role.getCatalogVersion());
     if (createDropRoleParams.isIs_drop()) {
       resp.result.addToRemoved_catalog_objects(catalogObject);
-      catalog_.getDeleteLog().addRemovedObject(catalogObject);
     } else {
       resp.result.addToUpdated_catalog_objects(catalogObject);
     }
@@ -2879,9 +2871,6 @@ public class CatalogOpExecutor {
       catalogObject.setPrivilege(rolePriv.toThrift());
       catalogObject.setCatalog_version(rolePriv.getCatalogVersion());
       updatedPrivs.add(catalogObject);
-      if (!grantRevokePrivParams.isIs_grant() && !privileges.get(0).isHas_grant_opt()) {
-        catalog_.getDeleteLog().addRemovedObject(catalogObject);
-      }
     }
 
     // TODO: Currently we only support sending back 1 catalog object in a "direct DDL"
@@ -3053,9 +3042,6 @@ public class CatalogOpExecutor {
           resp.result.setUpdated_catalog_objects(addedFuncs);
           resp.result.setRemoved_catalog_objects(removedFuncs);
           resp.result.setVersion(catalog_.getCatalogVersion());
-          for (TCatalogObject removedFn: removedFuncs) {
-            catalog_.getDeleteLog().addRemovedObject(removedFn);
-          }
         }
       }
     } else if (req.isSetTable_name()) {
@@ -3093,7 +3079,6 @@ public class CatalogOpExecutor {
         // processed as a direct DDL operation.
         if (tblWasRemoved.getRef()) {
           resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
-          catalog_.getDeleteLog().addRemovedObject(updatedThriftTable);
         } else {
           resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
         }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e945a3b..b56527b 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -38,8 +38,7 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetCatalogDeltaResponse;
-import org.apache.impala.thrift.TGetCatalogDeltaRequest;
+import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -120,11 +119,9 @@ public class JniCatalog {
   /**
    * Gets all catalog objects
    */
-  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
-      throws ImpalaException, TException {
-    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
-    JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
-    TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version());
+  public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException {
+    TGetAllCatalogObjectsResponse resp =
+        catalog_.getCatalogObjects(from_version);
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(resp);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 1003dc7..e2b1715 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -311,12 +311,14 @@ class StatestoreSubscriber(object):
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
-                        num_updates=1):
+                        num_updates=1, deletions=None):
     topic_entries = [
       Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
       for x in xrange(num_updates)]
+    if deletions is None: deletions = []
     return Subscriber.TTopicDelta(topic_name=topic_name,
                                   topic_entries=topic_entries,
+                                  topic_deletions=deletions,
                                   is_delta=False)
 
   def test_registration_ids_different(self):
@@ -347,9 +349,11 @@ class TestStatestore():
         assert len(args.topic_deltas) == 1
         assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
+        assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions
       elif sub.update_count == 3:
         # After the content-bearing update was processed, the next delta should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
+        assert len(args.topic_deltas[topic_name].topic_deletions) == 0
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
@@ -457,7 +461,7 @@ class TestStatestore():
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, see
         # IMPALA-1891
-        assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
+        assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),


Mime
View raw message