impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From he...@apache.org
Subject [3/7] incubator-impala git commit: IMPALA-4160: Remove Llama support.
Date Wed, 21 Sep 2016 04:00:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 5ee60d6..c8ebd5d 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -26,12 +26,10 @@
 
 #include "common/global-types.h"
 #include "common/status.h"
-#include "scheduling/query-resource-mgr.h"
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/Types_types.h"  // for TNetworkAddress
 #include "gen-cpp/Frontend_types.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
 
 namespace impala {
 
@@ -74,30 +72,19 @@ class QuerySchedule {
       const TQueryOptions& query_options, RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
-  /// Returns OK if reservation_ contains a matching resource for each
-  /// of the hosts in fragment_exec_params_. Returns an error otherwise.
-  Status ValidateReservation();
-
   const TUniqueId& query_id() const { return query_id_; }
   const TQueryExecRequest& request() const { return request_; }
   const TQueryOptions& query_options() const { return query_options_; }
   const std::string& request_pool() const { return request_pool_; }
   void set_request_pool(const std::string& pool_name) { request_pool_ = pool_name; }
-  bool HasReservation() const { return !reservation_.allocated_resources.empty(); }
-
-  /// Granted or timed out reservations need to be released. In both such cases,
-  /// the reservation_'s reservation_id is set.
-  bool NeedsRelease() const { return reservation_.__isset.reservation_id; }
 
-  /// Gets the estimated memory (bytes) and vcores per-node. Returns the user specified
-  /// estimate (MEM_LIMIT query parameter) if provided or the estimate from planning if
-  /// available, but is capped at the amount of physical memory to avoid problems if
-  /// either estimate is unreasonably large.
+  /// Gets the estimated memory (bytes) per-node. Returns the user specified estimate
+  /// (MEM_LIMIT query parameter) if provided or the estimate from planning if available,
+  /// but is capped at the amount of physical memory to avoid problems if either estimate
+  /// is unreasonably large.
   int64_t GetPerHostMemoryEstimate() const;
-  int16_t GetPerHostVCores() const;
   /// Total estimated memory for all nodes. set_num_hosts() must be set before calling.
   int64_t GetClusterMemoryEstimate() const;
-  void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst);
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
@@ -116,10 +103,6 @@ class QuerySchedule {
   const boost::unordered_set<TNetworkAddress>& unique_hosts() const {
     return unique_hosts_;
   }
-  TResourceBrokerReservationResponse* reservation() { return &reservation_; }
-  const TResourceBrokerReservationRequest& reservation_request() const {
-    return reservation_request_;
-  }
   bool is_admitted() const { return is_admitted_; }
   void set_is_admitted(bool is_admitted) { is_admitted_ = is_admitted; }
   RuntimeProfile* summary_profile() { return summary_profile_; }
@@ -127,10 +110,6 @@ class QuerySchedule {
 
   void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts);
 
-  /// Populates reservation_request_ ready to submit a query to Llama for all initial
-  /// resources required for this query.
-  void PrepareReservationRequest(const std::string& pool, const std::string& user);
-
  private:
 
   /// These references are valid for the lifetime of this query schedule because they
@@ -165,18 +144,9 @@ class QuerySchedule {
   /// Request pool to which the request was submitted for admission.
   std::string request_pool_;
 
-  /// Reservation request to be submitted to Llama. Set in PrepareReservationRequest().
-  TResourceBrokerReservationRequest reservation_request_;
-
-  /// Fulfilled reservation request. Populated by scheduler.
-  TResourceBrokerReservationResponse reservation_;
-
   /// Indicates if the query has been admitted for execution.
   bool is_admitted_;
 
-  /// Resolves unique_hosts_ to node mgr addresses. Valid only after SetUniqueHosts() has
-  /// been called.
-  boost::scoped_ptr<ResourceResolver> resource_resolver_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/request-pool-service.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index 9f3363d..ea2553e 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -44,6 +44,7 @@ static const string DEFAULT_USER = "default";
 
 DEFINE_string(fair_scheduler_allocation_path, "", "Path to the fair scheduler "
     "allocation file (fair-scheduler.xml).");
+// TODO: Rename / cleanup now that Llama is removed (see IMPALA-4159).
 DEFINE_string(llama_site_path, "", "Path to the Llama configuration file "
     "(llama-site.xml). If set, fair_scheduler_allocation_path must also be set.");
 
@@ -74,7 +75,6 @@ DEFINE_bool(disable_pool_mem_limits, false, "Disables all per-pool mem limits.")
 DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on the "
     "maximum number of running requests.");
 
-DECLARE_bool(enable_rm);
 
 // Pool name used when the configuration files are not specified.
 static const string DEFAULT_POOL_NAME = "default-pool";
@@ -94,12 +94,7 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
   resolve_pool_ms_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, RESOLVE_POOL_METRIC_NAME);
 
-  if (FLAGS_fair_scheduler_allocation_path.empty() &&
-      FLAGS_llama_site_path.empty()) {
-    if (FLAGS_enable_rm) {
-      CLEAN_EXIT_WITH_ERROR("If resource management is enabled, "
-          "-fair_scheduler_allocation_path is required.");
-    }
+  if (FLAGS_fair_scheduler_allocation_path.empty()) {
     default_pool_only_ = true;
     bool is_percent; // not used
     int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_default_pool_mem_limit,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 8b074d2..4c3a967 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -31,7 +31,6 @@
 #include "gen-cpp/PlanNodes_types.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
 
 namespace impala {
 
@@ -57,21 +56,6 @@ class Scheduler {
   /// Releases the reserved resources (if any) from the given schedule.
   virtual Status Release(QuerySchedule* schedule) = 0;
 
-  /// Notifies this scheduler that a resource reservation has been preempted by the
-  /// central scheduler (Yarn via Llama). All affected queries are cancelled
-  /// via their coordinator.
-  virtual void HandlePreemptedReservation(const TUniqueId& reservation_id) = 0;
-
-  /// Notifies this scheduler that a single resource with the given client resource id
-  /// has been preempted by the central scheduler (Yarn via Llama). All affected queries
-  /// are cancelled via their coordinator.
-  virtual void HandlePreemptedResource(const TUniqueId& client_resource_id) = 0;
-
-  /// Notifies this scheduler that a single resource with the given client resource id
-  /// has been lost by the central scheduler (Yarn via Llama). All affected queries
-  /// are cancelled via their coordinator.
-  virtual void HandleLostResource(const TUniqueId& client_resource_id) = 0;
-
   /// Initialises the scheduler, acquiring all resources needed to make
   /// scheduling decisions once this method returns.
   virtual Status Init() = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc
index 76da6f9..7116e21 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -830,8 +830,8 @@ class SchedulerWrapper {
     scheduler_backend_address.hostname = scheduler_host.ip;
     scheduler_backend_address.port = scheduler_host.be_port;
 
-    scheduler_.reset(new SimpleScheduler(NULL, scheduler_backend_id,
-        scheduler_backend_address, &metrics_, NULL, NULL, NULL));
+    scheduler_.reset(new SimpleScheduler(
+        NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL));
     scheduler_->Init();
     // Initialize the scheduler backend maps.
     SendFullMembershipMap();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index f4b90cc..f3ba9a5 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -29,7 +29,6 @@
 
 #include "common/logging.h"
 #include "util/metrics.h"
-#include "resourcebroker/resource-broker.h"
 #include "runtime/exec-env.h"
 #include "runtime/coordinator.h"
 #include "service/impala-server.h"
@@ -43,11 +42,9 @@
 #include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
-#include "util/llama-util.h"
 #include "util/mem-info.h"
 #include "util/parse-util.h"
 #include "util/runtime-profile-counters.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
 
 #include "common/names.h"
 
@@ -58,9 +55,6 @@ using namespace strings;
 
 DECLARE_int32(be_port);
 DECLARE_string(hostname);
-DECLARE_bool(enable_rm);
-DECLARE_int32(rm_default_cpu_vcores);
-DECLARE_string(rm_default_memory);
 
 DEFINE_bool(disable_admission_control, false, "Disables admission control.");
 
@@ -79,8 +73,7 @@ const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
 
 SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
     const string& backend_id, const TNetworkAddress& backend_address,
-    MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
-    RequestPoolService* request_pool_service)
+    MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
   : backend_config_(std::make_shared<const BackendConfig>()),
     metrics_(metrics->GetOrCreateChildGroup("scheduler")),
     webserver_(webserver),
@@ -90,7 +83,6 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
     total_assignments_(NULL),
     total_local_assignments_(NULL),
     initialized_(NULL),
-    resource_broker_(resource_broker),
     request_pool_service_(request_pool_service) {
   local_backend_descriptor_.address = backend_address;
 
@@ -99,32 +91,10 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
     admission_controller_.reset(
         new AdmissionController(request_pool_service_, metrics, backend_address));
   }
-
-  if (FLAGS_enable_rm) {
-    if (FLAGS_rm_default_cpu_vcores <= 0) {
-      LOG(ERROR) << "Bad value for --rm_default_cpu_vcores (must be postive): "
-                 << FLAGS_rm_default_cpu_vcores;
-      exit(1);
-    }
-    bool is_percent;
-    int64_t mem_bytes =
-        ParseUtil::ParseMemSpec(
-            FLAGS_rm_default_memory, &is_percent, MemInfo::physical_mem());
-    if (mem_bytes <= 1024 * 1024) {
-      LOG(ERROR) << "Bad value for --rm_default_memory (must be larger than 1M):"
-                 << FLAGS_rm_default_memory;
-      exit(1);
-    } else if (is_percent) {
-      LOG(ERROR) << "Must use absolute value for --rm_default_memory: "
-                 << FLAGS_rm_default_memory;
-      exit(1);
-    }
-  }
 }
 
 SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
-    MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
-    RequestPoolService* request_pool_service)
+    MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
   : backend_config_(std::make_shared<const BackendConfig>(backends)),
     metrics_(metrics),
     webserver_(webserver),
@@ -133,7 +103,6 @@ SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
     total_assignments_(NULL),
     total_local_assignments_(NULL),
     initialized_(NULL),
-    resource_broker_(resource_broker),
     request_pool_service_(request_pool_service) {
   DCHECK(backends.size() > 0);
   local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
@@ -289,10 +258,7 @@ void SimpleScheduler::UpdateMembership(
 
     // If this impalad is not in our view of the membership list, we should add it and
     // tell the statestore.
-    bool is_offline = ExecEnv::GetInstance() &&
-        ExecEnv::GetInstance()->impala_server()->IsOffline();
-    if (!is_offline &&
-        current_membership_.find(local_backend_id_) == current_membership_.end()) {
+    if (current_membership_.find(local_backend_id_) == current_membership_.end()) {
       VLOG(1) << "Registering local backend with statestore";
       subscriber_topic_updates->push_back(TTopicDelta());
       TTopicDelta& update = subscriber_topic_updates->back();
@@ -308,13 +274,6 @@ void SimpleScheduler::UpdateMembership(
                      << " " << status.GetDetail();
         subscriber_topic_updates->pop_back();
       }
-    } else if (is_offline &&
-        current_membership_.find(local_backend_id_) != current_membership_.end()) {
-      LOG(WARNING) << "Removing offline ImpalaServer from statestore";
-      subscriber_topic_updates->push_back(TTopicDelta());
-      TTopicDelta& update = subscriber_topic_updates->back();
-      update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
-      update.topic_deletions.push_back(local_backend_id_);
     }
     if (metrics_ != NULL) {
       num_fragment_instances_metric_->set_value(current_membership_.size());
@@ -626,7 +585,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
   for (const FragmentExecParams& exec_params: *fragment_exec_params) {
     unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end());
   }
-
   schedule->SetUniqueHosts(unique_hosts);
 }
 
@@ -723,38 +681,12 @@ Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
   schedule->set_request_pool(resolved_pool);
   schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
 
-  if (ExecEnv::GetInstance()->impala_server()->IsOffline()) {
-    return Status("This Impala server is offline. Please retry your query later.");
-  }
-
   RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
   ComputeFragmentHosts(schedule->request(), schedule);
   ComputeFragmentExecParams(schedule->request(), schedule);
   if (!FLAGS_disable_admission_control) {
     RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
   }
-  if (!FLAGS_enable_rm) return Status::OK();
-  string user = GetEffectiveUser(schedule->request().query_ctx.session);
-  if (user.empty()) user = "default";
-  schedule->PrepareReservationRequest(resolved_pool, user);
-  const TResourceBrokerReservationRequest& reservation_request =
-      schedule->reservation_request();
-  if (!reservation_request.resources.empty()) {
-    Status status = resource_broker_->Reserve(
-        reservation_request, schedule->reservation());
-    if (!status.ok()) {
-      // Warn about missing table and/or column stats if necessary.
-      const TQueryCtx& query_ctx = schedule->request().query_ctx;
-      if (!query_ctx.__isset.parent_query_id &&
-          query_ctx.__isset.tables_missing_stats &&
-          !query_ctx.tables_missing_stats.empty()) {
-        status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats));
-      }
-      return status;
-    }
-    RETURN_IF_ERROR(schedule->ValidateReservation());
-    AddToActiveResourceMaps(*schedule->reservation(), coord);
-  }
   return Status::OK();
 }
 
@@ -762,106 +694,9 @@ Status SimpleScheduler::Release(QuerySchedule* schedule) {
   if (!FLAGS_disable_admission_control) {
     RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule));
   }
-  if (FLAGS_enable_rm && schedule->NeedsRelease()) {
-    DCHECK(resource_broker_ != NULL);
-    Status status = resource_broker_->ReleaseReservation(
-        schedule->reservation()->reservation_id);
-    // Remove the reservation from the active-resource maps even if there was an error
-    // releasing the reservation because the query running in the reservation is done.
-    RemoveFromActiveResourceMaps(*schedule->reservation());
-    RETURN_IF_ERROR(status);
-  }
   return Status::OK();
 }
 
-void SimpleScheduler::AddToActiveResourceMaps(
-    const TResourceBrokerReservationResponse& reservation, Coordinator* coord) {
-  lock_guard<mutex> l(active_resources_lock_);
-  active_reservations_[reservation.reservation_id] = coord;
-  map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
-  for (iter = reservation.allocated_resources.begin();
-      iter != reservation.allocated_resources.end();
-      ++iter) {
-    TUniqueId client_resource_id;
-    client_resource_id << iter->second.client_resource_id;
-    active_client_resources_[client_resource_id] = coord;
-  }
-}
-
-void SimpleScheduler::RemoveFromActiveResourceMaps(
-    const TResourceBrokerReservationResponse& reservation) {
-  lock_guard<mutex> l(active_resources_lock_);
-  active_reservations_.erase(reservation.reservation_id);
-  map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
-  for (iter = reservation.allocated_resources.begin();
-      iter != reservation.allocated_resources.end();
-      ++iter) {
-    TUniqueId client_resource_id;
-    client_resource_id << iter->second.client_resource_id;
-    active_client_resources_.erase(client_resource_id);
-  }
-}
-
-// TODO: Refactor the Handle*{Reservation,Resource} functions to avoid code duplication.
-void SimpleScheduler::HandlePreemptedReservation(const TUniqueId& reservation_id) {
-  VLOG_QUERY << "HandlePreemptedReservation client_id=" << reservation_id;
-  Coordinator* coord = NULL;
-  {
-    lock_guard<mutex> l(active_resources_lock_);
-    ActiveReservationsMap::iterator it = active_reservations_.find(reservation_id);
-    if (it != active_reservations_.end()) coord = it->second;
-  }
-  if (coord == NULL) {
-    LOG(WARNING) << "Ignoring preempted reservation id " << reservation_id
-                 << " because no active query using it was found.";
-  } else {
-    stringstream err_msg;
-    err_msg << "Reservation " << reservation_id << " was preempted";
-    Status status(err_msg.str());
-    coord->Cancel(&status);
-  }
-}
-
-void SimpleScheduler::HandlePreemptedResource(const TUniqueId& client_resource_id) {
-  VLOG_QUERY << "HandlePreemptedResource client_id=" << client_resource_id;
-  Coordinator* coord = NULL;
-  {
-    lock_guard<mutex> l(active_resources_lock_);
-    ActiveClientResourcesMap::iterator it =
-        active_client_resources_.find(client_resource_id);
-    if (it != active_client_resources_.end()) coord = it->second;
-  }
-  if (coord == NULL) {
-    LOG(WARNING) << "Ignoring preempted client resource id " << client_resource_id
-                 << " because no active query using it was found.";
-  } else {
-    stringstream err_msg;
-    err_msg << "Resource " << client_resource_id << " was preempted";
-    Status status(err_msg.str());
-    coord->Cancel(&status);
-  }
-}
-
-void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
-  VLOG_QUERY << "HandleLostResource preempting client_id=" << client_resource_id;
-  Coordinator* coord = NULL;
-  {
-    lock_guard<mutex> l(active_resources_lock_);
-    ActiveClientResourcesMap::iterator it =
-        active_client_resources_.find(client_resource_id);
-    if (it != active_client_resources_.end()) coord = it->second;
-  }
-  if (coord == NULL) {
-    LOG(WARNING) << "Ignoring lost client resource id " << client_resource_id
-                 << " because no active query using it was found.";
-  } else {
-    stringstream err_msg;
-    err_msg << "Resource " << client_resource_id << " was lost";
-    Status status(err_msg.str());
-    coord->Cancel(&status);
-  }
-}
-
 SimpleScheduler::AssignmentCtx::AssignmentCtx(
     const BackendConfig& backend_config,
     IntCounter* total_assignments, IntCounter* total_local_assignments)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index dd119c2..8c96dde 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -36,12 +36,10 @@
 #include "scheduling/admission-controller.h"
 #include "scheduling/backend-config.h"
 #include "gen-cpp/Types_types.h"  // for TNetworkAddress
-#include "gen-cpp/ResourceBrokerService_types.h"
 #include "rapidjson/rapidjson.h"
 
 namespace impala {
 
-class ResourceBroker;
 class Coordinator;
 
 class SchedulerWrapper;
@@ -78,22 +76,18 @@ class SimpleScheduler : public Scheduler {
   ///  - backend_address - the address that this backend listens on
   SimpleScheduler(StatestoreSubscriber* subscriber, const std::string& backend_id,
       const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver,
-      ResourceBroker* resource_broker, RequestPoolService* request_pool_service);
+      RequestPoolService* request_pool_service);
 
   /// Initialize with a list of <host:port> pairs in 'static' mode - i.e. the set of
   /// backends is fixed and will not be updated.
   SimpleScheduler(const std::vector<TNetworkAddress>& backends, MetricGroup* metrics,
-      Webserver* webserver, ResourceBroker* resource_broker,
-      RequestPoolService* request_pool_service);
+      Webserver* webserver, RequestPoolService* request_pool_service);
 
   /// Register with the subscription manager if required
   virtual impala::Status Init();
 
   virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
   virtual Status Release(QuerySchedule* schedule);
-  virtual void HandlePreemptedReservation(const TUniqueId& reservation_id);
-  virtual void HandlePreemptedResource(const TUniqueId& client_resource_id);
-  virtual void HandleLostResource(const TUniqueId& client_resource_id);
 
  private:
   /// Map from a host's IP address to the next backend to be round-robin scheduled for
@@ -306,27 +300,6 @@ class SimpleScheduler : public Scheduler {
   /// Current number of backends
   IntGauge* num_fragment_instances_metric_;
 
-  /// Protect active_reservations_ and active_client_resources_.
-  boost::mutex active_resources_lock_;
-
-  /// Map from a Llama reservation id to the coordinator of the query using that
-  /// reservation. The map is used to cancel queries whose reservation has been preempted.
-  /// Entries are added in Schedule() calls that result in granted resource allocations.
-  /// Entries are removed in Release().
-  typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveReservationsMap;
-  ActiveReservationsMap active_reservations_;
-
-  /// Map from client resource id to the coordinator of the query using that resource.
-  /// The map is used to cancel queries whose resource(s) have been preempted.
-  /// Entries are added in Schedule() calls that result in granted resource allocations.
-  /// Entries are removed in Release().
-  typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveClientResourcesMap;
-  ActiveClientResourcesMap active_client_resources_;
-
-  /// Resource broker that mediates resource requests between Impala and the Llama.
-  /// Set to NULL if resource management is disabled.
-  ResourceBroker* resource_broker_;
-
   /// Used for user-to-pool resolution and looking up pool configurations. Not owned by
   /// us.
   RequestPoolService* request_pool_service_;
@@ -339,16 +312,6 @@ class SimpleScheduler : public Scheduler {
   BackendConfigPtr GetBackendConfig() const;
   void SetBackendConfig(const BackendConfigPtr& backend_config);
 
-  /// Add the granted reservation and resources to the active_reservations_ and
-  /// active_client_resources_ maps, respectively.
-  void AddToActiveResourceMaps(
-      const TResourceBrokerReservationResponse& reservation, Coordinator* coord);
-
-  /// Remove the given reservation and resources from the active_reservations_ and
-  /// active_client_resources_ maps, respectively.
-  void RemoveFromActiveResourceMaps(
-      const TResourceBrokerReservationResponse& reservation);
-
   /// Called asynchronously when an update is received from the subscription manager
   void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c749a6a..1b10aec 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -60,7 +60,6 @@
 #include "service/query-exec-state.h"
 #include "scheduling/simple-scheduler.h"
 #include "util/bit-util.h"
-#include "util/cgroups-mgr.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
@@ -177,9 +176,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
     "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
     " the maximum allowable timeout.");
 
-DEFINE_string(local_nodemanager_url, "", "The URL of the local Yarn Node Manager's HTTP "
-    "interface, used to detect if the Node Manager fails");
-DECLARE_bool(enable_rm);
+// TODO: Remove for Impala 3.0.
+DEFINE_string(local_nodemanager_url, "", "Deprecated");
+
 DECLARE_bool(compact_catalog_topic);
 
 namespace impala {
@@ -361,12 +360,6 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
       bind<void>(&ImpalaServer::ExpireQueries, this)));
 
-  is_offline_ = false;
-  if (FLAGS_enable_rm) {
-    nm_failure_detection_thread_.reset(new Thread("impala-server", "nm-failure-detector",
-            bind<void>(&ImpalaServer::DetectNmFailures, this)));
-  }
-
   exec_env_->SetImpalaServer(this);
 }
 
@@ -783,9 +776,7 @@ Status ImpalaServer::ExecuteInternal(
     shared_ptr<QueryExecState>* exec_state) {
   DCHECK(session_state != NULL);
   *registered_exec_state = false;
-  if (IsOffline()) {
-    return Status("This Impala server is offline. Please retry your query later.");
-  }
+
   exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
       this, session_state));
 
@@ -1938,81 +1929,6 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
   }
 }
 
-void ImpalaServer::SetOffline(bool is_offline) {
-  lock_guard<mutex> l(is_offline_lock_);
-  is_offline_ = is_offline;
-  ImpaladMetrics::IMPALA_SERVER_READY->set_value(is_offline);
-}
-
-void ImpalaServer::DetectNmFailures() {
-  DCHECK(FLAGS_enable_rm);
-  if (FLAGS_local_nodemanager_url.empty()) {
-    LOG(WARNING) << "No NM address set (--nm_addr is empty), no NM failure detection "
-                 << "thread started";
-    return;
-  }
-  // We only want a network address to open a socket to, for now. Get rid of http(s)://
-  // prefix, and split the string into hostname:port.
-  if (istarts_with(FLAGS_local_nodemanager_url, "http://")) {
-    FLAGS_local_nodemanager_url =
-        FLAGS_local_nodemanager_url.substr(string("http://").size());
-  } else if (istarts_with(FLAGS_local_nodemanager_url, "https://")) {
-    FLAGS_local_nodemanager_url =
-        FLAGS_local_nodemanager_url.substr(string("https://").size());
-  }
-  vector<string> components;
-  split(components, FLAGS_local_nodemanager_url, is_any_of(":"));
-  if (components.size() < 2) {
-    LOG(ERROR) << "Could not parse network address from --local_nodemanager_url, no NM"
-               << " failure detection thread started";
-    return;
-  }
-  DCHECK_GE(components.size(), 2);
-  TNetworkAddress nm_addr =
-      MakeNetworkAddress(components[0], atoi(components[1].c_str()));
-
-  MissedHeartbeatFailureDetector failure_detector(MAX_NM_MISSED_HEARTBEATS,
-      MAX_NM_MISSED_HEARTBEATS / 2);
-  struct addrinfo* addr;
-  if (getaddrinfo(nm_addr.hostname.c_str(), components[1].c_str(), NULL, &addr)) {
-    LOG(WARNING) << "Could not resolve NM address: " << nm_addr << ". Error was: "
-                 << GetStrErrMsg();
-    return;
-  }
-  LOG(INFO) << "Starting NM failure-detection thread, NM at: " << nm_addr;
-  // True if the last time through the loop Impala had failed, otherwise false. Used to
-  // only change the offline status when there's a change in state.
-  bool last_failure_state = false;
-  while (true) {
-    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-    if (sockfd >= 0) {
-      if (connect(sockfd, addr->ai_addr, sizeof(sockaddr)) < 0) {
-        failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, false);
-      } else {
-        failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, true);
-      }
-      ::close(sockfd);
-    } else {
-      LOG(ERROR) << "Could not create socket! Error was: " << GetStrErrMsg();
-    }
-    bool is_failed = (failure_detector.GetPeerState(FLAGS_local_nodemanager_url) ==
-        FailureDetector::FAILED);
-    if (is_failed != last_failure_state) {
-      if (is_failed) {
-        LOG(WARNING) <<
-            "ImpalaServer is going offline while local node-manager connectivity is bad";
-      } else {
-        LOG(WARNING) <<
-            "Node-manager connectivity has been restored. ImpalaServer is now online";
-      }
-      SetOffline(is_failed);
-    }
-    last_failure_state = is_failed;
-    SleepForMs(2000);
-  }
-  freeaddrinfo(addr);
-}
-
 void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
     const TUpdateFilterParams& params) {
   shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f756932..2104c5e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -240,12 +240,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
-  /// Returns true if Impala is offline (and not accepting queries), false otherwise.
-  bool IsOffline() {
-    boost::lock_guard<boost::mutex> l(is_offline_lock_);
-    return is_offline_;
-  }
-
   /// Returns true if lineage logging is enabled, false otherwise.
   bool IsLineageLoggingEnabled();
 
@@ -633,15 +627,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   /// FLAGS_idle_query_timeout seconds.
   void ExpireQueries();
 
-  /// Periodically opens a socket to FLAGS_local_nodemanager_url to check if the Yarn Node
-  /// Manager is running. If not, this method calls SetOffline(true), and when the NM
-  /// recovers, calls SetOffline(false). Only called (in nm_failure_detection_thread_) if
-  /// FLAGS_enable_rm is true.
-  void DetectNmFailures();
-
-  /// Set is_offline_ to the argument's value.
-  void SetOffline(bool offline);
-
   /// Guards query_log_ and query_log_index_
   boost::mutex query_log_lock_;
 
@@ -963,15 +948,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 
   /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
   boost::scoped_ptr<Thread> query_expiration_thread_;
-
-  /// Container thread for DetectNmFailures().
-  boost::scoped_ptr<Thread> nm_failure_detection_thread_;
-
-  /// Protects is_offline_
-  boost::mutex is_offline_lock_;
-
-  /// True if Impala server is offline, false otherwise.
-  bool is_offline_;
 };
 
 /// Create an ImpalaServer and Thrift servers.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index de1a56b..55d9ad6 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -54,6 +54,7 @@ DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
 DECLARE_int32(be_port);
 DECLARE_string(principal);
+DECLARE_bool(enable_rm);
 
 int ImpaladMain(int argc, char** argv) {
   InitCommonRuntime(argc, argv, true);
@@ -66,6 +67,13 @@ int ImpaladMain(int argc, char** argv) {
   ABORT_IF_ERROR(HiveUdfCall::Init());
   InitFeSupport();
 
+  if (FLAGS_enable_rm) {
+    // TODO: Remove in Impala 3.0.
+    LOG(WARNING) << "*****************************************************************";
+    LOG(WARNING) << "Llama support has been deprecated. FLAGS_enable_rm has no effect.";
+    LOG(WARNING) << "*****************************************************************";
+  }
+
   // start backend service for the coordinator on be_port
   ExecEnv exec_env;
   StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index cea24bb..eb710fb 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -21,7 +21,6 @@
 
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
-#include "resourcebroker/resource-broker.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -48,7 +47,6 @@ using namespace strings;
 
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
-DECLARE_bool(enable_rm);
 DECLARE_int64(max_result_cache_size);
 
 namespace impala {
@@ -428,34 +426,16 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
       query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
   DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
 
-  if (FLAGS_enable_rm) {
-    DCHECK(exec_env_->resource_broker() != NULL);
-  }
   schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
       exec_request_.query_options, &summary_profile_, query_events_));
   coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
   Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
-  if (FLAGS_enable_rm) {
-    if (status.ok()) {
-      stringstream reservation_request_ss;
-      reservation_request_ss << schedule_->reservation_request();
-      summary_profile_.AddInfoString("Resource reservation request",
-          reservation_request_ss.str());
-    }
-  }
 
   {
     lock_guard<mutex> l(lock_);
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
-  if (FLAGS_enable_rm && schedule_->HasReservation()) {
-    // Add the granted reservation to the query profile.
-    stringstream reservation_ss;
-    reservation_ss << *schedule_->reservation();
-    summary_profile_.AddInfoString("Granted resource reservation", reservation_ss.str());
-    query_events_->MarkEvent("Resources reserved");
-  }
   status = coord_->Exec(*schedule_, &output_expr_ctxs_);
   {
     lock_guard<mutex> l(lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index da682f4..62fff80 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -36,7 +36,6 @@ add_library(Util
   bitmap.cc
   bit-util.cc
   bloom-filter.cc
-  cgroups-mgr.cc
   coding-util.cc
   codec.cc
   compress.cc
@@ -54,7 +53,6 @@ add_library(Util
   hdr-histogram.cc
   impalad-metrics.cc
   jni-util.cc
-  llama-util.cc
   logging-support.cc
   mem-info.cc
   memory-metrics.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/cgroups-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cgroups-mgr.cc b/be/src/util/cgroups-mgr.cc
deleted file mode 100644
index e49d57c..0000000
--- a/be/src/util/cgroups-mgr.cc
+++ /dev/null
@@ -1,238 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/cgroups-mgr.h"
-
-#include <fstream>
-#include <sstream>
-#include <boost/filesystem.hpp>
-#include "util/debug-util.h"
-#include <gutil/strings/substitute.h>
-
-#include "common/names.h"
-
-using boost::filesystem::create_directory;
-using boost::filesystem::exists;
-using boost::filesystem::remove;
-using namespace impala;
-using namespace strings;
-
-namespace impala {
-
-// Suffix appended to Yarn resource ids to form an Impala-internal cgroups.
-const std::string IMPALA_CGROUP_SUFFIX = "_impala";
-
-// Yarn's default multiplier for translating virtual CPU cores into cgroup CPU shares.
-// See Yarn's CgroupsLCEResourcesHandler.java for more details.
-const int32_t CPU_DEFAULT_WEIGHT = 1024;
-
-CgroupsMgr::CgroupsMgr(MetricGroup* metrics) {
-  active_cgroups_metric_ = metrics->AddGauge<int64_t>("cgroups-mgr.active-cgroups", 0);
-}
-
-Status CgroupsMgr::Init(const string& cgroups_hierarchy_path,
-      const string& staging_cgroup) {
-  cgroups_hierarchy_path_ = cgroups_hierarchy_path;
-  staging_cgroup_ = staging_cgroup;
-  // Set up the staging cgroup for Impala to retire execution threads into.
-  RETURN_IF_ERROR(CreateCgroup(staging_cgroup, true));
-  return Status::OK();
-}
-
-string CgroupsMgr::UniqueIdToCgroup(const string& unique_id) const {
-  if (unique_id.empty()) return "";
-  return unique_id + IMPALA_CGROUP_SUFFIX;
-}
-
-int32_t CgroupsMgr::VirtualCoresToCpuShares(int16_t v_cpu_cores) {
-  if (v_cpu_cores <= 0) return -1;
-  return CPU_DEFAULT_WEIGHT * v_cpu_cores;
-}
-
-Status CgroupsMgr::CreateCgroup(const string& cgroup, bool if_not_exists) const {
-  const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup);
-  try {
-    // Returns false if the dir already exists, otherwise throws an exception.
-    if (!create_directory(cgroup_path) && !if_not_exists) {
-      stringstream err_msg;
-      err_msg << "Failed to create CGroup at path " << cgroup_path
-              << ". Path already exists.";
-      return Status(err_msg.str());
-    }
-    LOG(INFO) << "Created CGroup " << cgroup_path;
-  } catch (std::exception& e) {
-    stringstream err_msg;
-    err_msg << "Failed to create CGroup at path " << cgroup_path << ". " << e.what();
-    return Status(err_msg.str());
-  }
-  return Status::OK();
-}
-
-Status CgroupsMgr::DropCgroup(const string& cgroup, bool if_exists) const {
-  const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup);
-  LOG(INFO) << "Dropping CGroup " << cgroups_hierarchy_path_ << " " << cgroup;
-  try {
-    if(!remove(cgroup_path) && !if_exists) {
-      stringstream err_msg;
-      err_msg << "Failed to create CGroup at path " << cgroup_path
-              << ". Path does not exist.";
-      return Status(err_msg.str());
-    }
-  } catch (std::exception& e) {
-    stringstream err_msg;
-    err_msg << "Failed to drop CGroup at path " << cgroup_path << ". " << e.what();
-    return Status(err_msg.str());
-  }
-  return Status::OK();
-}
-
-Status CgroupsMgr::SetCpuShares(const string& cgroup, int32_t num_shares) {
-  string cgroup_path;
-  string tasks_path;
-  RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path));
-
-  const string& cpu_shares_path = Substitute("$0/$1", cgroup_path, "cpu.shares");
-  ofstream cpu_shares(tasks_path.c_str(), ios::out | ios::trunc);
-  if (!cpu_shares.is_open()) {
-    stringstream err_msg;
-    err_msg << "CGroup CPU shares file: " << cpu_shares_path
-            << " is not writable by Impala";
-    return Status(err_msg.str());
-  }
-
-  LOG(INFO) << "Setting CPU shares of CGroup " << cgroup_path << " to " << num_shares;
-  cpu_shares << num_shares << endl;
-  return Status::OK();
-}
-
-Status CgroupsMgr::GetCgroupPaths(const std::string& cgroup,
-    std::string* cgroup_path, std::string* tasks_path) const {
-  stringstream cgroup_path_ss;
-  cgroup_path_ss << cgroups_hierarchy_path_ << "/" << cgroup;
-  *cgroup_path = cgroup_path_ss.str();
-  if (!exists(*cgroup_path)) {
-    stringstream err_msg;
-    err_msg << "CGroup " << *cgroup_path << " does not exist";
-    return Status(err_msg.str());
-  }
-
-  stringstream tasks_path_ss;
-  tasks_path_ss << *cgroup_path << "/tasks";
-  *tasks_path = tasks_path_ss.str();
-  if (!exists(*tasks_path)) {
-    stringstream err_msg;
-    err_msg << "CGroup " << *cgroup_path << " does not have a /tasks file";
-    return Status(err_msg.str());
-  }
-  return Status::OK();
-}
-
-Status CgroupsMgr::AssignThreadToCgroup(const Thread& thread,
-    const string& cgroup) const {
-  string cgroup_path;
-  string tasks_path;
-  RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path));
-
-  ofstream tasks(tasks_path.c_str(), ios::out | ios::app);
-  if (!tasks.is_open()) {
-    stringstream err_msg;
-    err_msg << "CGroup tasks file: " << tasks_path << " is not writable by Impala";
-    return Status(err_msg.str());
-  }
-  tasks << thread.tid() << endl;
-
-  VLOG_ROW << "Thread " << thread.tid() << " moved to CGroup " << cgroup_path;
-  tasks.close();
-  return Status::OK();
-}
-
-Status CgroupsMgr::RelocateThreads(const string& src_cgroup,
-    const string& dst_cgroup) const {
-  string src_cgroup_path;
-  string src_tasks_path;
-  RETURN_IF_ERROR(GetCgroupPaths(src_cgroup, &src_cgroup_path, &src_tasks_path));
-
-  string dst_cgroup_path;
-  string dst_tasks_path;
-  RETURN_IF_ERROR(GetCgroupPaths(dst_cgroup, &dst_cgroup_path, &dst_tasks_path));
-
-  ifstream src_tasks(src_tasks_path.c_str());
-  if (!src_tasks) {
-    stringstream err_msg;
-    err_msg << "Failed to open source CGroup tasks file at: " << src_tasks_path;
-    return Status(err_msg.str());
-  }
-
-  ofstream dst_tasks(dst_tasks_path.c_str(), ios::out | ios::app);
-  if (!dst_tasks) {
-    stringstream err_msg;
-    err_msg << "Failed to open destination CGroup tasks file at: " << dst_tasks_path;
-    return Status(err_msg.str());
-  }
-
-  int32_t tid;
-  while (src_tasks >> tid) {
-    dst_tasks << tid << endl;
-    // Attempting to write a non-existent tid/pid will result in an error,
-    // so clear the error flags after every append.
-    dst_tasks.clear();
-    VLOG_ROW << "Relocating thread id " << tid << " from " << src_tasks_path
-             << " to " << dst_tasks_path;
-  }
-
-  return Status::OK();
-}
-
-Status CgroupsMgr::RegisterFragment(const TUniqueId& fragment_instance_id,
-    const string& cgroup, bool* is_first) {
-  if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK();
-
-  LOG(INFO) << "Registering fragment " << PrintId(fragment_instance_id)
-            << " with CGroup " << cgroups_hierarchy_path_ << "/" << cgroup;
-  lock_guard<mutex> l(active_cgroups_lock_);
-  if (++active_cgroups_[cgroup] == 1) {
-    *is_first = true;
-    RETURN_IF_ERROR(CreateCgroup(cgroup, false));
-    active_cgroups_metric_->Increment(1);
-  } else {
-    *is_first = false;
-  }
-  return Status::OK();
-}
-
-Status CgroupsMgr::UnregisterFragment(const TUniqueId& fragment_instance_id,
-    const string& cgroup) {
-  if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK();
-
-  LOG(INFO) << "Unregistering fragment " << PrintId(fragment_instance_id)
-            << " from CGroup " << cgroups_hierarchy_path_ << "/" << cgroup;
-  lock_guard<mutex> l(active_cgroups_lock_);
-  unordered_map<string, int32_t>::iterator entry = active_cgroups_.find(cgroup);
-  DCHECK(entry != active_cgroups_.end());
-
-  int32_t* ref_count = &entry->second;
-  --(*ref_count);
-  if (*ref_count == 0) {
-    RETURN_IF_ERROR(RelocateThreads(cgroup, staging_cgroup_));
-    RETURN_IF_ERROR(DropCgroup(cgroup, false));
-    active_cgroups_metric_->Increment(-1);
-    active_cgroups_.erase(entry);
-  }
-  return Status::OK();
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/cgroups-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/util/cgroups-mgr.h b/be/src/util/cgroups-mgr.h
deleted file mode 100644
index 2e52f6b..0000000
--- a/be/src/util/cgroups-mgr.h
+++ /dev/null
@@ -1,175 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_UTIL_CGROUPS_MGR_H
-#define IMPALA_UTIL_CGROUPS_MGR_H
-
-#include <string>
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-#include "common/status.h"
-#include "util/metrics.h"
-#include "util/thread.h"
-
-namespace impala {
-
-/// Control Groups, or 'cgroups', are a Linux-specific mechanism for arbitrating resources
-/// amongst threads.
-//
-/// CGroups are organised in a forest of 'hierarchies', each of which are mounted at a path
-/// in the filesystem. Each hierarchy contains one or more cgroups, arranged
-/// hierarchically. Each hierarchy has one or more 'subsystems' attached. Each subsystem
-/// represents a resource to manage, so for example there is a CPU subsystem and a MEMORY
-/// subsystem. There are rules about when subsystems may be attached to more than one
-/// hierarchy, which are out of scope of this description.
-//
-/// Each thread running on a kernel with cgroups enabled belongs to exactly one cgroup in
-/// every hierarchy at once. Impala is only concerned with a single hierarchy that assigns
-/// CPU resources in the first instance. Threads are assigned to cgroups by writing their
-/// thread ID to a file in the special cgroup filesystem.
-//
-/// For more information:
-/// access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/
-/// www.kernel.org/doc/Documentation/cgroups/cgroups.txt
-
-/// Manages the lifecycle of Impala-internal cgroups as well as the assignment of
-/// execution threads into cgroups.
-/// To execute queries Impala requests resources from Yarn via the Llama. Yarn returns
-/// granted resources via the Llama in the form or "RM resource ids" that conventionally
-/// correspond to a CGroups that the Yarn NM creates. Instead of directly using the
-/// NM-provided CGroups, Impala creates and manages its own CGroups for the
-/// following reasons:
-/// 1. In typical CM/Yarn setups, Impala would not have permissions to write to the tasks
-///    file of NM-provided CGroups. It is arguably not even desirable (e.g., for security
-///    reasons) for external process to be able to manipulate the permissions of
-///    NM-generated CGroups either directly or indirectly.
-/// 2. Yarn-granted CGroups are created asynchronously (the AM calls to create the
-///    CGroups are non-blocking). From Impala's perspective that means that once Impala
-///    receives notice from the Llama that resources have been granted, it cannot
-///    assume that the corresponding containers have been created (although the Yarn
-///    NMs eventually will). While each of Impala's plan fragments could wait for the
-///    CGroups to be created, it seems unnecessarily complicated and slow to do so.
-/// 3. Impala will probably want to manage its own CGroups eventually, e.g., for
-///    optimistic query scheduling.
-//
-/// In summary, the typical CGroups-related flow of an Impala query is as follows:
-/// 1. Impala receives granted resources from Llama and sends out plan fragments
-/// 2. On each node execution such a fragment, convert the Yarn resource id into
-///    a CGroup that Impala should create and assign the query's threads to
-/// 3. Register the fragment(s) and the CGroup for the query with the
-///    node-local CGroup manager. The registration creates the CGroup maintains a
-///    count of all fragments using that CGroup.
-/// 4. Execute the fragments, assigning threads into the Impala-managed CGroup.
-/// 5. Complete the fragments by unregistering them with the CGroup from the node-local
-///    CGroups manager. When the last fragment for a CGroup is unregistered, all threads
-///    from that CGroup are relocated into a special staging CGroup, so that the now
-///    unused CGroup can safely be deleted (otherwise, we'd have to wait for the OS to
-///    drain all entries from the CGroup's tasks file)
-class CgroupsMgr {
- public:
-  CgroupsMgr(MetricGroup* metrics);
-
-  /// Sets the cgroups mgr's corresponding members and creates the staging cgroup
-  /// under <cgroups_hierarchy_path>/<staging_cgroup>. Returns a non-OK status if
-  /// creation of the staging cgroup failed, e.g., because of insufficient privileges.
-  Status Init(const std::string& cgroups_hierarchy_path,
-      const std::string& staging_cgroup);
-
-  /// Returns the cgroup Impala should create and use for enforcing granted resources
-  /// identified by the given unique ID (which usually corresponds to a query ID). Returns
-  /// an empty string if unique_id is empty.
-  std::string UniqueIdToCgroup(const std::string& unique_id) const;
-
-  /// Returns the cgroup CPU shares corresponding to the given number of virtual cores.
-  /// Returns -1 if v_cpu_cores is <= 0 (which is invalid).
-  int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores);
-
-  /// Informs the cgroups mgr that a plan fragment intends to use the given cgroup.
-  /// If this is the first fragment requesting use of cgroup, then the cgroup will
-  /// be created and *is_first will be set to true (otherwise to false). In any case the
-  /// reference count active_cgroups_[cgroup] is incremented. Returns a non-OK status
-  /// if there was an error creating the cgroup.
-  Status RegisterFragment(const TUniqueId& fragment_instance_id,
-      const std::string& cgroup, bool* is_first);
-
-  /// Informs the cgroups mgr that a plan fragment using the given cgroup is complete.
-  /// Decrements the corresponding reference count active_cgroups_[cgroup]. If the
-  /// reference count reaches zero this function relocates all thread ids from
-  /// the cgroup to the staging_cgroup_ and drops cgroup (a cgroup with active thread ids
-  /// cannot be dropped, so we relocate the thread ids first).
-  /// Returns a non-OK status there was an error creating the cgroup.
-  Status UnregisterFragment(const TUniqueId& fragment_instance_id,
-      const std::string& cgroup);
-
-  /// Creates a cgroup at <cgroups_hierarchy_path_>/<cgroup>. Returns a non-OK status
-  /// if the cgroup creation failed, e.g., because of insufficient privileges.
-  /// If is_not_exists is true then no error is returned if the cgroup already exists.
-  Status CreateCgroup(const std::string& cgroup, bool if_not_exists) const;
-
-  /// Drops the cgroup at <cgroups_hierarchy_path_>/<cgroup>. Returns a non-OK status
-  /// if the cgroup deletion failed, e.g., because of insufficient privileges.
-  /// If if_exists is true then no error is returned if the cgroup does not exist.
-  Status DropCgroup(const std::string& cgroup, bool if_exists) const;
-
-  /// Sets the number of CPU shares for the given cgroup by writing num_shares into the
-  /// cgroup's cpu.shares file. Returns a non-OK status if there was an error writing
-  /// to the file, e.g., because of insufficient privileges.
-  Status SetCpuShares(const std::string& cgroup, int32_t num_shares);
-
-  /// Assigns a given thread to a cgroup, by writing its thread id to
-  /// <cgroups_hierarchy_path_>/<cgroup>/tasks. If there is no file at that
-  /// location, returns an error. Otherwise no attempt is made to check that the
-  /// target belongs to a cgroup hierarchy due to the cost of reading and parsing
-  /// cgroup information from the filesystem.
-  Status AssignThreadToCgroup(const Thread& thread, const std::string& cgroup) const;
-
-  /// Reads the <cgroups_hierarchy_path_>/<src_cgroup>/tasks file and writing all the
-  /// contained thread ids to <cgroups_hierarchy_path_>/<dst_cgroup>/tasks.
-  /// Assumes that the destination cgroup has already been created. Returns a non-OK
-  /// status if there was an error reading src_cgroup and/or writing dst_cgroup.
-  Status RelocateThreads(const std::string& src_cgroup,
-      const std::string& dst_cgroup) const;
-
- private:
-  /// Checks that the cgroups_hierarchy_path_ and the given cgroup under it exists.
-  /// Returns an error if either of them do not exist.
-  /// Returns the absolute cgroup path and the absolute path to its tasks file.
-  Status GetCgroupPaths(const std::string& cgroup,
-      std::string* cgroup_path, std::string* tasks_path) const;
-
-  /// Number of currently active Impala-managed cgroups.
-  IntGauge* active_cgroups_metric_;
-
-  /// Root of the CPU cgroup hierarchy. Created cgroups are placed directly under it.
-  std::string cgroups_hierarchy_path_;
-
-  /// Cgroup that threads from completed queries are relocated into such that the
-  /// query's cgroup can be dropped.
-  std::string staging_cgroup_;
-
-  /// Protects active_cgroups_.
-  boost::mutex active_cgroups_lock_;
-
-  /// Process-wide map from cgroup to number of fragments using the cgroup.
-  /// A cgroup can be safely dropped once the number of fragments in the cgroup,
-  /// according to this map, reaches zero.
-  boost::unordered_map<std::string, int32_t> active_cgroups_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 4b17ed7..a02a288 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -32,7 +32,6 @@
 #include "gen-cpp/RuntimeProfile_types.h"
 #include "gen-cpp/ImpalaService_types.h"
 #include "gen-cpp/parquet_types.h"
-#include "gen-cpp/Llama_types.h"
 
 #include "runtime/descriptors.h" // for SchemaPath
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/llama-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/llama-util.cc b/be/src/util/llama-util.cc
deleted file mode 100644
index 82f2bd6..0000000
--- a/be/src/util/llama-util.cc
+++ /dev/null
@@ -1,152 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/llama-util.h"
-
-#include <sstream>
-#include <boost/algorithm/string/join.hpp>
-#include <boost/algorithm/string.hpp>
-
-#include "common/names.h"
-#include "util/debug-util.h"
-#include "util/uid-util.h"
-
-using boost::algorithm::is_any_of;
-using boost::algorithm::join;
-using boost::algorithm::split;
-using namespace llama;
-
-namespace llama {
-
-string PrintId(const TUniqueId& id, const string& separator) {
-  return PrintId(impala::CastTUniqueId<TUniqueId, impala::TUniqueId>(id), separator);
-}
-
-ostream& operator<<(ostream& os, const TUniqueId& id) {
-  os << hex << id.hi << ":" << id.lo;
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TNetworkAddress& address) {
-  os << address.hostname << ":" << dec << address.port;
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TResource& resource) {
-  os << "Resource("
-     << "client_resource_id=" << resource.client_resource_id << " "
-     << "v_cpu_cores=" << dec << resource.v_cpu_cores << " "
-     << "memory_mb=" << dec << resource.memory_mb << " "
-     << "asked_location=" << resource.askedLocation << " "
-     << "enforcement=" << resource.enforcement << ")";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const TAllocatedResource& resource) {
-  os << "Allocated Resource("
-     << "reservation_id=" << resource.reservation_id << " "
-     << "client_resource_id=" << resource.client_resource_id << " "
-     << "rm_resource_id=" << resource.rm_resource_id << " "
-     << "v_cpu_cores=" << dec << resource.v_cpu_cores << " "
-     << "memory_mb=" << dec << resource.memory_mb << " "
-     << "location=" << resource.location << ")";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const llama::TLlamaAMGetNodesRequest& request) {
-  os << "GetNodes Request(llama handle=" << request.am_handle << ")";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const llama::TLlamaAMReservationRequest& request) {
-  os << "Reservation Request("
-     << "llama handle=" << request.am_handle << " "
-     << "queue=" << request.queue << " "
-     << "user=" << request.user << " "
-     << "gang=" << request.gang << " "
-     << "resources=[";
-  for (int i = 0; i < request.resources.size(); ++i) {
-    os << request.resources[i];
-    if (i + 1 != request.resources.size()) os << ",";
-  }
-  os << "])";
-  return os;
-}
-
-ostream& operator<<(ostream& os,
-    const llama::TLlamaAMReservationExpansionRequest& request) {
-  os << "Expansion Request("
-     << "llama handle=" << request.am_handle << " "
-     << "reservation id=" << request.expansion_of << " "
-     << "resource=" << request.resource << ")";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const llama::TLlamaAMReleaseRequest& request) {
-  os << "Release Request("
-     << "llama handle=" << request.am_handle << " "
-     << "reservation id=" << request.reservation_id << ")";
-  return os;
-}
-
-llama::TUniqueId& operator<<(llama::TUniqueId& dest, const impala::TUniqueId& src) {
-  dest.lo = src.lo;
-  dest.hi = src.hi;
-  return dest;
-}
-
-impala::TUniqueId& operator<<(impala::TUniqueId& dest, const llama::TUniqueId& src) {
-  dest.lo = src.lo;
-  dest.hi = src.hi;
-  return dest;
-}
-
-bool operator==(const impala::TUniqueId& impala_id, const llama::TUniqueId& llama_id) {
-  return impala_id.lo == llama_id.lo && impala_id.hi == llama_id.hi;
-}
-
-llama::TNetworkAddress& operator<<(llama::TNetworkAddress& dest,
-    const impala::TNetworkAddress& src) {
-  dest.hostname = src.hostname;
-  dest.port = src.port;
-  return dest;
-}
-
-impala::TNetworkAddress& operator<<(impala::TNetworkAddress& dest,
-    const llama::TNetworkAddress& src) {
-  dest.hostname = src.hostname;
-  dest.port = src.port;
-  return dest;
-}
-
-impala::Status LlamaStatusToImpalaStatus(const TStatus& status,
-    const string& err_prefix) {
-  if (status.status_code == TStatusCode::OK) return impala::Status::OK();
-  stringstream ss;
-  ss << err_prefix << " " << join(status.error_msgs, ", ");
-  return impala::Status(ss.str());
-}
-
-string GetShortName(const string& user) {
-  if (user.empty() || user[0] == '/' || user[0] == '@') return user;
-
-  vector<string> components;
-  split(components, user, is_any_of("/@"));
-  return components[0];
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/llama-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/llama-util.h b/be/src/util/llama-util.h
deleted file mode 100644
index f6fc4ce..0000000
--- a/be/src/util/llama-util.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_UTIL_LLAMA_UTIL_H
-#define IMPALA_UTIL_LLAMA_UTIL_H
-
-#include <ostream>
-#include <string>
-#include <boost/functional/hash.hpp>
-
-#include "gen-cpp/Types_types.h"  // for TUniqueId
-#include "gen-cpp/Llama_types.h"  // for TUniqueId
-#include "common/status.h"
-
-namespace llama {
-
-std::ostream& operator<<(std::ostream& os, const llama::TUniqueId& id);
-std::ostream& operator<<(std::ostream& os, const llama::TNetworkAddress& address);
-std::ostream& operator<<(std::ostream& os, const llama::TResource& resource);
-std::ostream& operator<<(std::ostream& os, const llama::TAllocatedResource& resource);
-
-std::ostream& operator<<(std::ostream& os,
-    const llama::TLlamaAMGetNodesRequest& request);
-std::ostream& operator<<(std::ostream& os,
-    const llama::TLlamaAMReservationRequest& request);
-std::ostream& operator<<(std::ostream& os,
-    const llama::TLlamaAMReservationExpansionRequest& request);
-std::ostream& operator<<(std::ostream& os,
-    const llama::TLlamaAMReleaseRequest& request);
-
-/// 'Assignment' operators to convert types between the llama and impala namespaces.
-llama::TUniqueId& operator<<(llama::TUniqueId& dest, const impala::TUniqueId& src);
-impala::TUniqueId& operator<<(impala::TUniqueId& dest, const llama::TUniqueId& src);
-
-std::string PrintId(const llama::TUniqueId& id, const std::string& separator = ":");
-
-bool operator==(const impala::TUniqueId& impala_id, const llama::TUniqueId& llama_id);
-
-llama::TNetworkAddress& operator<<(llama::TNetworkAddress& dest,
-    const impala::TNetworkAddress& src);
-impala::TNetworkAddress& operator<<(impala::TNetworkAddress& dest,
-    const llama::TNetworkAddress& src);
-
-impala::Status LlamaStatusToImpalaStatus(const llama::TStatus& status,
-    const std::string& err_prefix = "");
-
-/// This function must be called 'hash_value' to be picked up by boost.
-inline std::size_t hash_value(const llama::TUniqueId& id) {
-  std::size_t seed = 0;
-  boost::hash_combine(seed, id.lo);
-  boost::hash_combine(seed, id.hi);
-  return seed;
-}
-
-/// Get the short version of the user name (the user's name up to the first '/' or '@')
-/// If neither are found (or are found at the beginning of the user name) return username.
-std::string GetShortName(const std::string& user);
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index 8fc1bcc..ccc49c9 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -114,10 +114,6 @@ class ThreadPool {
     Join();
   }
 
-  Status AssignToCgroup(const std::string& cgroup) {
-    return threads_.SetCgroup(cgroup);
-  }
-
  private:
   /// Driver method for each thread in the pool. Continues to read work from the queue
   /// until the pool is shutdown.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 6a8b9b8..757ba59 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -26,7 +26,6 @@
 #include "util/coding-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
-#include "util/cgroups-mgr.h"
 #include "util/metrics.h"
 #include "util/webserver.h"
 #include "util/os-util.h"
@@ -321,10 +320,6 @@ void Thread::SuperviseThread(const string& name, const string& category,
 
 Status ThreadGroup::AddThread(Thread* thread) {
   threads_.push_back(thread);
-  if (!cgroup_path_.empty()) {
-    DCHECK(cgroups_mgr_ != NULL);
-    RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(*thread, cgroup_path_));
-  }
   return Status::OK();
 }
 
@@ -332,13 +327,4 @@ void ThreadGroup::JoinAll() {
   for (const Thread& thread: threads_) thread.Join();
 }
 
-Status ThreadGroup::SetCgroup(const string& cgroup) {
-  DCHECK(cgroups_mgr_ != NULL);
-  cgroup_path_ = cgroup;
-  for (const Thread& t: threads_) {
-    RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(t, cgroup));
-  }
-  return Status::OK();
-}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 8c880d2..4e2b65d 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -31,7 +31,6 @@ namespace impala {
 
 class MetricGroup;
 class Webserver;
-class CgroupsMgr;
 
 /// Thin wrapper around boost::thread that can register itself with the singleton
 /// ThreadMgr (a private class implemented in thread.cc entirely, which tracks all live
@@ -165,39 +164,19 @@ class ThreadGroup {
  public:
   ThreadGroup() {}
 
-  ThreadGroup(CgroupsMgr* cgroups_mgr, const std::string& cgroup)
-        : cgroups_mgr_(cgroups_mgr), cgroup_path_(cgroup) { }
-
   /// Adds a new Thread to this group. The ThreadGroup takes ownership of the Thread, and
   /// will destroy it when the ThreadGroup is destroyed.  Threads will linger until that
   /// point (even if terminated), however, so callers should be mindful of the cost of
   /// placing very many threads in this set.
-  /// If cgroup_path_ / cgroup_prefix_ are set, the thread will be added to the specified
-  /// cgroup and an error will be returned if that operation fails.
   Status AddThread(Thread* thread);
 
   /// Waits for all threads to finish. DO NOT call this from a thread inside this set;
   /// deadlock will predictably ensue.
   void JoinAll();
 
-  /// Assigns all current and future threads to the given cgroup managed by cgroups_mgr_.
-  /// Must be called after SetCgroupsMgr() if groups_mgr_ has not been set already.
-  /// Returns an error if any assignment was not possible, but does not undo previously
-  /// successful assignments.
-  Status SetCgroup(const std::string& cgroup);
-
-  void SetCgroupsMgr(CgroupsMgr* cgroups_mgr) { cgroups_mgr_ = cgroups_mgr; }
-
  private:
   /// All the threads grouped by this set.
   boost::ptr_vector<Thread> threads_;
-
-  /// Cgroups manager for assigning threads in this group to cgroups. Not owned.
-  CgroupsMgr* cgroups_mgr_;
-
-  /// If not empty, every thread added to this group will also be placed in the
-  /// cgroup_path_ managed by the cgroups_mgr_.
-  std::string cgroup_path_;
 };
 
 /// Initialises the threading subsystem. Must be called before a Thread is created.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index c78a9ea..f0f87ec 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -37,10 +37,7 @@ inline std::size_t hash_value(const impala::TUniqueId& id) {
   return seed;
 }
 
-/// Templated so that this method is not namespace-specific (since we also call this on
-/// llama::TUniqueId)
-template <typename T>
-inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, T* unique_id) {
+inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id) {
   memcpy(&(unique_id->hi), &uuid.data[0], 8);
   memcpy(&(unique_id->lo), &uuid.data[8], 8);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index d8621c4..6524e82 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -22,7 +22,7 @@
 # that we can deduce the version settings of the dependencies from the environment.
 # IMPALA_TOOLCHAIN indicates the location where the prebuilt artifacts should be extracted
 # to. If DOWNLOAD_CDH_COMPONENTS is set to true, this script will also download and extract
-# the CDH components (i.e. Hadoop, Hive, HBase, Llama, Llama-minikdc and Sentry) into
+# the CDH components (i.e. Hadoop, Hive, HBase and Sentry) into
 # CDH_COMPONENTS_HOME.
 #
 # The script is called as follows without any additional parameters:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/create-test-configuration.sh
----------------------------------------------------------------------
diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index bbe8f61..8d695a4 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -148,7 +148,7 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
   # strange, but making these symlinks also results in data loading
   # failures in the non-kerberized case.  Without these, mapreduce
   # jobs die in a kerberized cluster because they can't find their
-  # kerberos principals.  Obviously this has to be sorted out before
+  # kerberos principals. Obviously this has to be sorted out before
   # a kerberized cluster can load data.
   echo "Linking yarn and mapred from local cluster"
   ln -s ${CLUSTER_HADOOP_CONF_DIR}/yarn-site.xml

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/generate_minidump_collection_testdata.py
----------------------------------------------------------------------
diff --git a/bin/generate_minidump_collection_testdata.py b/bin/generate_minidump_collection_testdata.py
index 27f9a42..a408e05 100755
--- a/bin/generate_minidump_collection_testdata.py
+++ b/bin/generate_minidump_collection_testdata.py
@@ -71,7 +71,6 @@ CONFIG_FILE = '''-beeswax_port=21000
 -max_lineage_log_file_size=5000
 -hostname=vb0204.halxg.cloudera.com
 -state_store_host=vb0202.halxg.cloudera.com
--enable_rm=false
 -state_store_port=24000
 -catalog_service_host=vb0202.halxg.cloudera.com
 -catalog_service_port=26000

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index bfde4c3..b92fcf1 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -41,8 +41,6 @@ parser.add_option("--build_type", dest="build_type", default= 'latest',
 parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
                   default=[],
                   help="Additional arguments to pass to each Impalad during startup")
-parser.add_option("--enable_rm", dest="enable_rm", action="store_true", default=False,
-                  help="Enable resource management with Yarn and Llama.")
 parser.add_option("--state_store_args", dest="state_store_args", action="append",
                   type="string", default=[],
                   help="Additional arguments to pass to State Store during startup")
@@ -91,8 +89,6 @@ IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d  -be_port=%d "
                  "-llama_callback_port=%d")
 JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
 BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s"
-RM_ARGS = ("-enable_rm=true -llama_addresses=%s -cgroup_hierarchy_path=%s "
-           "-fair_scheduler_allocation_path=%s")
 CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
 # Kills have a timeout to prevent automated scripts from hanging indefinitely.
 # It is set to a high value to avoid failing if processes are slow to shut down.
@@ -208,20 +204,6 @@ def build_jvm_args(instance_num):
   BASE_JVM_DEBUG_PORT = 30000
   return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
 
-def build_rm_args(instance_num):
-  if not options.enable_rm: return ""
-  try:
-    cgroup_path = cgroups.create_impala_cgroup_path(instance_num + 1)
-  except Exception, ex:
-    raise RuntimeError("Unable to initialize RM: %s" % str(ex))
-  llama_address = "localhost:15000"
-
-  # Don't bother checking if the path doesn't exist, the impalad won't start up
-  relative_fs_cfg_path = 'cdh%s/node-%d/etc/hadoop/conf/fair-scheduler.xml' %\
-      (os.environ.get('CDH_MAJOR_VERSION'), instance_num + 1)
-  fs_cfg_path = os.path.join(os.environ.get('CLUSTER_DIR'), relative_fs_cfg_path)
-  return RM_ARGS % (llama_address, cgroup_path, fs_cfg_path)
-
 def start_impalad_instances(cluster_size):
   if cluster_size == 0:
     # No impalad instances should be started.
@@ -250,11 +232,10 @@ def start_impalad_instances(cluster_size):
 
     # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
     param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
-    args = "--mem_limit=%s %s %s %s %s %s" %\
+    args = "--mem_limit=%s %s %s %s %s" %\
           (mem_limit,  # Goes first so --impalad_args will override it.
            build_impalad_logging_args(i, service_name), build_jvm_args(i),
-           build_impalad_port_args(i), param_args,
-           build_rm_args(i))
+           build_impalad_port_args(i), param_args)
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt
index e66bd89..3104ee2 100644
--- a/common/thrift/CMakeLists.txt
+++ b/common/thrift/CMakeLists.txt
@@ -162,7 +162,6 @@ set (SRC_FILES
   ImpalaService.thrift
   JniCatalog.thrift
   LineageGraph.thrift
-  Llama.thrift
   Logging.thrift
   NetworkTest.thrift
   MetricDefs.thrift
@@ -171,7 +170,6 @@ set (SRC_FILES
   Planner.thrift
   Partitions.thrift
   parquet.thrift
-  ResourceBrokerService.thrift
   Results.thrift
   RuntimeProfile.thrift
   StatestoreService.thrift

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 708cb46..732ea4a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -387,6 +387,7 @@ struct TQueryExecRequest {
 
   // Estimated per-host CPU requirements in YARN virtual cores.
   // Used for resource management.
+  // TODO: Remove this and associated code in Planner.
   11: optional i16 per_host_vcores
 
   // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bf03d98..003a618 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -33,7 +33,6 @@ include "DataSinks.thrift"
 include "Results.thrift"
 include "RuntimeProfile.thrift"
 include "ImpalaService.thrift"
-include "Llama.thrift"
 
 // constants for TQueryOptions.num_nodes
 const i32 NUM_NODES_ALL = 0
@@ -366,12 +365,6 @@ struct TPlanFragmentInstanceCtx {
 
   // Id of this fragment in its role as a sender.
   11: optional i32 sender_id
-
-  // Resource reservation to run this plan fragment in.
-  12: optional Llama.TAllocatedResource reserved_resource
-
-  // Address of local node manager (used for expanding resource allocations)
-  13: optional Types.TNetworkAddress local_resource_address
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/Llama.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Llama.thrift b/common/thrift/Llama.thrift
deleted file mode 100644
index a9b7f5f..0000000
--- a/common/thrift/Llama.thrift
+++ /dev/null
@@ -1,276 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-namespace cpp llama
-namespace java com.cloudera.llama.thrift
-
-////////////////////////////////////////////////////////////////////////////////
-// DATA TYPES
-
-enum TLlamaServiceVersion {
-   V1
-}
-
-struct TUniqueId {
-  1: required i64 hi;
-  2: required i64 lo;
-}
-
-struct TNetworkAddress {
-  1: required string hostname;
-  2: required i32    port;
-}
-
-enum TStatusCode {
-  OK,
-  REQUEST_ERROR,
-  INTERNAL_ERROR
-}
-
-struct TStatus {
-  1: required TStatusCode status_code;
-  2: i16 error_code;
-  3: list<string> error_msgs;
-}
-
-enum TLocationEnforcement {
-  MUST,
-  PREFERRED,
-  DONT_CARE
-}
-
-struct TResource {
-  1: required TUniqueId            client_resource_id;
-  2: required i16                  v_cpu_cores;
-  3: required i32                  memory_mb;
-  4: required string               askedLocation;
-  5: required TLocationEnforcement enforcement;
-}
-
-struct TAllocatedResource {
-  1: required TUniqueId reservation_id;
-  2: required TUniqueId client_resource_id;
-  3: required string    rm_resource_id;
-  4: required i16       v_cpu_cores;
-  5: required i32       memory_mb;
-  6: required string    location;
-}
-
-struct TNodeCapacity {
-  1: required i16 total_v_cpu_cores;
-  2: required i32 total_memory_mb;
-  3: required i16 free_v_cpu_cores;
-  4: required i32 free_memory_mb;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama AM Service
-
-struct TLlamaAMRegisterRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            client_id;
-  3: required TNetworkAddress      notification_callback_service;
-}
-
-struct TLlamaAMRegisterResponse {
-  1: required TStatus   status;
-  2: optional TUniqueId am_handle;
-}
-
-struct TLlamaAMUnregisterRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId am_handle;
-}
-
-struct TLlamaAMUnregisterResponse {
-  1: required TStatus status;
-}
-
-struct TLlamaAMReservationRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            am_handle;
-  3: required string               user;
-  4: optional string               queue;
-  5: required list<TResource>      resources;
-  6: required bool                 gang;
-  7: optional TUniqueId            reservation_id;
-}
-
-struct TLlamaAMReservationResponse {
-  1: required TStatus   status;
-  2: optional TUniqueId reservation_id;
-}
-
-struct TLlamaAMReservationExpansionRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            am_handle;
-  3: required TUniqueId            expansion_of;
-  4: required TResource            resource;
-  5: optional TUniqueId            expansion_id;
-}
-
-struct TLlamaAMReservationExpansionResponse {
-  1: required TStatus   status;
-  2: optional TUniqueId expansion_id;
-}
-
-struct TLlamaAMReleaseRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            am_handle;
-  3: required TUniqueId            reservation_id;
-}
-
-struct TLlamaAMReleaseResponse {
-  1: required TStatus status;
-}
-
-struct TLlamaAMGetNodesRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            am_handle;
-}
-
-struct TLlamaAMGetNodesResponse {
-  1: required TStatus status;
-  2: optional list<string> nodes;
-}
-
-service LlamaAMService {
-
-  TLlamaAMRegisterResponse Register(1: TLlamaAMRegisterRequest request);
-
-  TLlamaAMUnregisterResponse Unregister(1: TLlamaAMUnregisterRequest request);
-
-  TLlamaAMReservationResponse Reserve(1: TLlamaAMReservationRequest request);
-
-    TLlamaAMReservationExpansionResponse Expand(
-    1: TLlamaAMReservationExpansionRequest request);
-
-  TLlamaAMReleaseResponse Release(1: TLlamaAMReleaseRequest request);
-
-  TLlamaAMGetNodesResponse GetNodes(1: TLlamaAMGetNodesRequest request);
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama AM Admin Service
-
-struct TLlamaAMAdminReleaseRequest {
-  1: required TLlamaServiceVersion version;
-  2: optional bool                 do_not_cache = false;
-  3: optional list<string>         queues;
-  4: optional list<TUniqueId>      handles;
-  5: optional list<TUniqueId>      reservations;
-}
-
-struct TLlamaAMAdminReleaseResponse {
-  1: required TStatus status;
-}
-
-struct TLlamaAMAdminEmptyCacheRequest {
-  1: required TLlamaServiceVersion version;
-  2: optional bool                 allQueues = false;
-  3: optional list<string>         queues;
-}
-
-struct TLlamaAMAdminEmptyCacheResponse {
-  1: required TStatus status;
-}
-
-service LlamaAMAdminService {
-
-  TLlamaAMAdminReleaseResponse Release
-  (1: TLlamaAMAdminReleaseRequest request);
-
-  TLlamaAMAdminEmptyCacheResponse EmptyCache
-  (1: TLlamaAMAdminEmptyCacheRequest request);
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama NM Service
-
-struct TLlamaNMRegisterRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            client_id;
-  3: required TNetworkAddress      notification_callback_service;
-}
-
-struct TLlamaNMRegisterResponse {
-  1: required TStatus   status;
-  2: optional TUniqueId nm_handle;
-}
-
-struct TLlamaNMUnregisterRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            nm_handle;
-}
-
-struct TLlamaNMUnregisterResponse {
-  1: required TStatus status;
-}
-
-service LlamaNMService {
-
-  TLlamaNMRegisterResponse Register(1: TLlamaNMRegisterRequest request);
-
-  TLlamaNMUnregisterResponse Unregister(1: TLlamaNMUnregisterRequest request);
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama Notification Callback Service
-
-struct TLlamaAMNotificationRequest {
-  1: required TLlamaServiceVersion     version;
-  2: required TUniqueId                am_handle;
-  3: required bool                     heartbeat;
-  4: optional list<TUniqueId>          allocated_reservation_ids;
-  5: optional list<TAllocatedResource> allocated_resources;
-  6: optional list<TUniqueId>          rejected_reservation_ids;
-  7: optional list<TUniqueId>          rejected_client_resource_ids;
-  8: optional list<TUniqueId>          lost_client_resource_ids;
-  9: optional list<TUniqueId>          preempted_reservation_ids;
-  10: optional list<TUniqueId>         preempted_client_resource_ids;
-  11: optional list<TUniqueId>         admin_released_reservation_ids;
-  12: optional list<TUniqueId>         lost_reservation_ids;
-}
-
-struct TLlamaAMNotificationResponse {
-  1: required TStatus status;
-}
-
-struct TLlamaNMNotificationRequest {
-  1: required TLlamaServiceVersion version;
-  2: required TUniqueId            nm_handle;
-  3: required TNodeCapacity        node_capacity;
-  4: list<string>                  preempted_rm_resource_ids;
-}
-
-struct TLlamaNMNotificationResponse {
-  1: required TStatus status;
-}
-
-service LlamaNotificationService {
-
-  TLlamaAMNotificationResponse AMNotification(
-    1: TLlamaAMNotificationRequest request);
-
-  TLlamaNMNotificationResponse NMNotification(
-    1: TLlamaNMNotificationRequest request);
-}
-
-////////////////////////////////////////////////////////////////////////////////


Mime
View raw message