impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbap...@apache.org
Subject [1/2] incubator-impala git commit: IMPALA-3394: Add tests, make BackendConfig own class, refactor
Date Thu, 25 Aug 2016 21:29:19 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 8cab36cf6 -> 19a2dcfbe


IMPALA-3394: Add tests, make BackendConfig own class, refactor

This change factors SimpleScheduler::BackendConfig into an own class and
adds unit tests for it.

Change-Id: I2d3acb6f68b16ca0af06dad0098d7ec1eff41202
Reviewed-on: http://gerrit.cloudera.org:8080/4116
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f5541d60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f5541d60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f5541d60

Branch: refs/heads/master
Commit: f5541d604046a9f47d1000c2ddc8f38a732c6456
Parents: 8cab36c
Author: Lars Volker <lv@cloudera.com>
Authored: Fri Aug 19 00:36:19 2016 +0200
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Thu Aug 25 20:23:04 2016 +0000

----------------------------------------------------------------------
 be/src/scheduling/CMakeLists.txt         |   2 +
 be/src/scheduling/backend-config-test.cc | 100 ++++++++++++++++++++
 be/src/scheduling/backend-config.cc      |  89 ++++++++++++++++++
 be/src/scheduling/backend-config.h       |  75 +++++++++++++++
 be/src/scheduling/simple-scheduler.cc    | 130 +++-----------------------
 be/src/scheduling/simple-scheduler.h     |  65 ++-----------
 be/src/util/network-util.cc              |  38 +++++++-
 be/src/util/network-util.h               |  20 +++-
 8 files changed, 337 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 5ce7ff9..c5b4eb4 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -25,6 +25,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling")
 # TODO: Move other scheduling-related classes here
 add_library(Scheduling STATIC
   admission-controller.cc
+  backend-config.cc
   query-resource-mgr.cc
   query-schedule.cc
   request-pool-service.cc
@@ -33,5 +34,6 @@ add_library(Scheduling STATIC
 add_dependencies(Scheduling thrift-deps)
 
 ADD_BE_TEST(simple-scheduler-test)
+ADD_BE_TEST(backend-config-test)
 # TODO: Add BE test
 # ADD_BE_TEST(admission-controller-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config-test.cc b/be/src/scheduling/backend-config-test.cc
new file mode 100644
index 0000000..82dc6a5
--- /dev/null
+++ b/be/src/scheduling/backend-config-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "scheduling/backend-config.h"
+
+#include "common/logging.h"
+#include "common/names.h"
+#include "util/network-util.h"
+#include "util/thread.h"
+
+namespace impala {
+
+/// Test that BackendConfig can be created from a vector of backends.
+TEST(BackendConfigTest, MakeFromBackendVector) {
+  // This address needs to be resolvable using getaddrinfo().
+  vector<TNetworkAddress> backends {MakeNetworkAddress("localhost", 1001)};
+  BackendConfig backend_config(backends);
+  IpAddr backend_ip;
+  bool ret = backend_config.LookUpBackendIp(backends[0].hostname, &backend_ip);
+  ASSERT_TRUE(ret);
+  EXPECT_EQ("127.0.0.1", backend_ip);
+}
+
+/// Test adding multiple backends on different hosts.
+TEST(BackendConfigTest, AddBackends) {
+  BackendConfig backend_config;
+  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+  backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
+  ASSERT_EQ(2, backend_config.NumBackends());
+  IpAddr backend_ip;
+  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  ASSERT_TRUE(backend_config.LookUpBackendIp("host_2", &backend_ip));
+  EXPECT_EQ("10.0.0.2", backend_ip);
+}
+
+/// Test adding multiple backends on the same host.
+TEST(BackendConfigTest, MultipleBackendsOnSameHost) {
+  BackendConfig backend_config;
+  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
+  IpAddr backend_ip;
+  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  const BackendConfig::BackendList& backend_list =
+      backend_config.GetBackendListForHost("10.0.0.1");
+  EXPECT_EQ(2, backend_list.size());
+}
+
+/// Test removing a backend.
+TEST(BackendConfigTest, RemoveBackend) {
+  BackendConfig backend_config;
+  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+  backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
+  backend_config.RemoveBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
+  IpAddr backend_ip;
+  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  ASSERT_FALSE(backend_config.LookUpBackendIp("host_2", &backend_ip));
+}
+
+/// Test removing one of multiple backends on the same host (IMPALA-3944).
+TEST(BackendConfigTest, RemoveBackendOnSameHost) {
+  BackendConfig backend_config;
+  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
+  backend_config.RemoveBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
+  IpAddr backend_ip;
+  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  const BackendConfig::BackendList& backend_list =
+      backend_config.GetBackendListForHost("10.0.0.1");
+  EXPECT_EQ(1, backend_list.size());
+}
+
+}  // end namespace impala
+
+int main(int argc, char **argv) {
+  google::InitGoogleLogging(argv[0]);
+  impala::CpuInfo::Init();
+  impala::InitThreading();
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config.cc b/be/src/scheduling/backend-config.cc
new file mode 100644
index 0000000..e5c6824
--- /dev/null
+++ b/be/src/scheduling/backend-config.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/backend-config.h"
+
+namespace impala{
+
+BackendConfig::BackendConfig(const std::vector<TNetworkAddress>& backends) {
+  // Construct backend_map and backend_ip_map.
+  for (const TNetworkAddress& backend: backends) {
+    IpAddr ip;
+    Status status = HostnameToIpAddr(backend.hostname, &ip);
+    if (!status.ok()) {
+      VLOG(1) << status.GetDetail();
+      continue;
+    }
+    AddBackend(MakeBackendDescriptor(backend.hostname, ip, backend.port));
+  }
+}
+
+const BackendConfig::BackendList& BackendConfig::GetBackendListForHost(
+    const IpAddr& ip) const {
+  BackendMap::const_iterator it = backend_map_.find(ip);
+  DCHECK(it != backend_map_.end());
+  return it->second;
+}
+
+void BackendConfig::GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const {
+  ip_addresses->reserve(NumBackends());
+  for (auto& it: backend_map_) ip_addresses->push_back(it.first);
+}
+
+void BackendConfig::GetAllBackends(BackendList* backends) const {
+  for (const auto& backend_list: backend_map_) {
+    backends->insert(backends->end(), backend_list.second.begin(),
+        backend_list.second.end());
+  }
+}
+
+void BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
+  DCHECK(!be_desc.ip_address.empty());
+  BackendList& be_descs = backend_map_[be_desc.ip_address];
+  if (find(be_descs.begin(), be_descs.end(), be_desc) == be_descs.end()) {
+    be_descs.push_back(be_desc);
+  }
+  backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
+}
+
+void BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
+  auto be_descs_it = backend_map_.find(be_desc.ip_address);
+  if (be_descs_it != backend_map_.end()) {
+    BackendList* be_descs = &be_descs_it->second;
+    be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
+    if (be_descs->empty()) {
+      backend_map_.erase(be_descs_it);
+      backend_ip_map_.erase(be_desc.address.hostname);
+    }
+  }
+}
+
+bool BackendConfig::LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const {
+  // Check if hostname is already a valid IP address.
+  if (backend_map_.find(hostname) != backend_map_.end()) {
+    if (ip) *ip = hostname;
+    return true;
+  }
+  auto it = backend_ip_map_.find(hostname);
+  if (it != backend_ip_map_.end()) {
+    if (ip) *ip = it->second;
+    return true;
+  }
+  return false;
+}
+
+}  // end ns impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config.h b/be/src/scheduling/backend-config.h
new file mode 100644
index 0000000..25f8292
--- /dev/null
+++ b/be/src/scheduling/backend-config.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SCHEDULING_BACKEND_CONFIG_H
+#define SCHEDULING_BACKEND_CONFIG_H
+
+#include <vector>
+
+#include <boost/unordered_map.hpp>
+
+#include "gen-cpp/StatestoreService_types.h"
+#include "gen-cpp/Types_types.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+/// Configuration class to store a list of backends per IP address and a mapping from
+/// hostnames to IP addresses.
+class BackendConfig {
+ public:
+  BackendConfig() {}
+
+  /// Construct from list of backends.
+  BackendConfig(const std::vector<TNetworkAddress>& backends);
+
+  /// List of Backends.
+  typedef std::list<TBackendDescriptor> BackendList;
+
+  /// Return the list of backends on a particular host. The caller must make sure that the
+  /// host is actually contained in backend_map_.
+  const BackendList& GetBackendListForHost(const IpAddr& ip) const;
+
+  void GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const;
+  void GetAllBackends(BackendList* backends) const;
+  void AddBackend(const TBackendDescriptor& be_desc);
+  void RemoveBackend(const TBackendDescriptor& be_desc);
+
+  /// Look up the IP address of 'hostname' in the internal backend maps and return
+  /// whether the lookup was successful. If 'hostname' itself is a valid IP address and
+  /// is contained in backend_map_, then it is copied to 'ip' and true is returned. 'ip'
+  /// can be NULL if the caller only wants to check whether the lookup succeeds. Use this
+  /// method to resolve datanode hostnames to IP addresses during scheduling, to prevent
+  /// blocking on the OS.
+  bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
+
+  int NumBackends() const { return backend_map_.size(); }
+
+ private:
+  /// Map from a host's IP address to a list of backends running on that node.
+  typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
+  BackendMap backend_map_;
+
+  /// Map from a hostname to its IP address to support hostname based backend lookup. It
+  /// contains entries for all backends in backend_map_ and needs to be updated whenever
+  /// backend_map_ changes.
+  typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
+  BackendIpAddressMap backend_ip_map_;
+};
+
+}  // end ns impala
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 5160393..9a865f0 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -185,7 +185,7 @@ Status SimpleScheduler::Init() {
   if (metrics_ != NULL) {
     // This is after registering with the statestored, so we already have to synchronize
     // access to the backend_config_ shared_ptr.
-    int num_backends = GetBackendConfig()->backend_map().size();
+    int num_backends = GetBackendConfig()->NumBackends();
     total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
     total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY,
0);
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
@@ -210,10 +210,11 @@ Status SimpleScheduler::Init() {
 
 void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
-  BackendList backends;
-  GetAllKnownBackends(&backends);
+  BackendConfig::BackendList backends;
+  BackendConfigPtr backend_config = GetBackendConfig();
+  backend_config->GetAllBackends(&backends);
   Value backends_list(kArrayType);
-  for (const BackendList::value_type& backend: backends) {
+  for (const TBackendDescriptor& backend: backends) {
     Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator());
     backends_list.PushBack(str, document->GetAllocator());
   }
@@ -334,16 +335,6 @@ void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config)
   backend_config_ = backend_config;
 }
 
-
-void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
-  backends->clear();
-  BackendConfigPtr backend_config = GetBackendConfig();
-  for (const BackendMap::value_type& backend_list: backend_config->backend_map())
{
-    backends->insert(backends->end(), backend_list.second.begin(),
-                     backend_list.second.end());
-  }
-}
-
 Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
     QuerySchedule* schedule) {
   map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry;
@@ -871,109 +862,21 @@ void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id)
{
   }
 }
 
-Status SimpleScheduler::HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) {
-  // Try to resolve via the operating system.
-  vector<IpAddr> ipaddrs;
-  Status status = HostnameToIpAddrs(hostname, &ipaddrs);
-  if (!status.ok() || ipaddrs.empty()) {
-    stringstream ss;
-    ss << "Failed to resolve " << hostname << ": " << status.GetDetail();
-    return Status(ss.str());
-  }
-
-  // HostnameToIpAddrs() calls getaddrinfo() from glibc and will preserve the order of the
-  // result. RFC 3484 only specifies a partial order so we need to sort the addresses
-  // before picking the first non-localhost one.
-  sort(ipaddrs.begin(), ipaddrs.end());
-
-  // Try to find a non-localhost address, otherwise just use the first IP address
-  // returned.
-  *ip = ipaddrs[0];
-  if (!FindFirstNonLocalhost(ipaddrs, ip)) {
-    VLOG(3) << "Only localhost addresses found for " << hostname;
-  }
-  return Status::OK();
-}
-
-SimpleScheduler::BackendConfig::BackendConfig(
-    const std::vector<TNetworkAddress>& backends) {
-  // Construct backend_map and backend_ip_map.
-  for (int i = 0; i < backends.size(); ++i) {
-    IpAddr ip;
-    Status status = HostnameToIpAddr(backends[i].hostname, &ip);
-    if (!status.ok()) {
-      VLOG(1) << status.GetDetail();
-      continue;
-    }
-
-    BackendMap::iterator it = backend_map_.find(ip);
-    if (it == backend_map_.end()) {
-      it = backend_map_.insert(
-          make_pair(ip, BackendList())).first;
-      backend_ip_map_[backends[i].hostname] = ip;
-    }
-
-    TBackendDescriptor descriptor;
-    descriptor.address = MakeNetworkAddress(ip, backends[i].port);
-    descriptor.ip_address = ip;
-    it->second.push_back(descriptor);
-  }
-}
-
-void SimpleScheduler::BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
-  DCHECK(!be_desc.ip_address.empty());
-  BackendList* be_descs = &backend_map_[be_desc.ip_address];
-  if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
-    be_descs->push_back(be_desc);
-  }
-  backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
-}
-
-void SimpleScheduler::BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc)
{
-  auto be_descs_it = backend_map_.find(be_desc.ip_address);
-  if (be_descs_it != backend_map_.end()) {
-    BackendList* be_descs = &be_descs_it->second;
-    be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
-    if (be_descs->empty()) {
-      backend_map_.erase(be_descs_it);
-      backend_ip_map_.erase(be_desc.address.hostname);
-    }
-  }
-}
-
-bool SimpleScheduler::BackendConfig::LookUpBackendIp(const Hostname& hostname,
-    IpAddr* ip) const {
-  // Check if hostname is already a valid IP address.
-  if (backend_map_.find(hostname) != backend_map_.end()) {
-    if (ip) *ip = hostname;
-    return true;
-  }
-  auto it = backend_ip_map_.find(hostname);
-  if (it != backend_ip_map_.end()) {
-    if (ip) *ip = it->second;
-    return true;
-  }
-  return false;
-}
-
 SimpleScheduler::AssignmentCtx::AssignmentCtx(
     const BackendConfig& backend_config,
     IntCounter* total_assignments, IntCounter* total_local_assignments)
   : backend_config_(backend_config), first_unused_backend_idx_(0),
     total_assignments_(total_assignments),
     total_local_assignments_(total_local_assignments) {
-  random_backend_order_.reserve(backend_map().size());
-  for (auto& v: backend_map()) random_backend_order_.push_back(&v);
+  backend_config.GetAllBackendIps(&random_backend_order_);
   std::mt19937 g(rand());
   std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
   // Initialize inverted map for backend rank lookups
   int i = 0;
-  for (const BackendMap::value_type* v: random_backend_order_) {
-    random_backend_rank_[v->first] = i++;
-  }
+  for (const IpAddr& ip: random_backend_order_) random_backend_rank_[ip] = i++;
 }
 
-const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
+const IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
     const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
   DCHECK(!data_locations.empty());
   // List of candidate indexes into 'data_locations'.
@@ -1005,7 +908,7 @@ const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBacken
   return &data_locations[*min_rank_idx];
 }
 
-const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost()
{
+const IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() {
   const IpAddr* candidate_ip;
   if (HasUnusedBackends()) {
     // Pick next unused backend.
@@ -1024,11 +927,9 @@ bool SimpleScheduler::AssignmentCtx::HasUnusedBackends() const {
   return first_unused_backend_idx_ < random_backend_order_.size();
 }
 
-const SimpleScheduler::IpAddr*
-    SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
+const IpAddr* SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
   DCHECK(HasUnusedBackends());
-  const IpAddr* ip = &(random_backend_order_[first_unused_backend_idx_++])->first;
-  DCHECK(backend_map().find(*ip) != backend_map().end());
+  const IpAddr* ip = &random_backend_order_[first_unused_backend_idx_++];
   return ip;
 }
 
@@ -1040,14 +941,14 @@ int SimpleScheduler::AssignmentCtx::GetBackendRank(const IpAddr&
ip) const {
 
 void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_ip,
     TBackendDescriptor* backend) {
-  BackendMap::const_iterator backend_it = backend_map().find(backend_ip);
-  DCHECK(backend_it != backend_map().end());
-  const BackendList& backends_on_host = backend_it->second;
+  DCHECK(backend_config_.LookUpBackendIp(backend_ip, NULL));
+  const BackendConfig::BackendList& backends_on_host =
+      backend_config_.GetBackendListForHost(backend_ip);
   DCHECK(backends_on_host.size() > 0);
   if (backends_on_host.size() == 1) {
     *backend = *backends_on_host.begin();
   } else {
-    BackendList::const_iterator* next_backend_on_host;
+    BackendConfig::BackendList::const_iterator* next_backend_on_host;
     next_backend_on_host = FindOrInsert(&next_backend_per_host_, backend_ip,
         backends_on_host.begin());
     DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host)
@@ -1078,7 +979,6 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
   IpAddr backend_ip;
   backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
   DCHECK(!backend_ip.empty());
-  DCHECK(backend_map().find(backend_ip) != backend_map().end());
   assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length,
       GetBackendRank(backend_ip));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index a6ab93e..dd119c2 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -34,6 +34,7 @@
 #include "util/metrics.h"
 #include "util/runtime-profile.h"
 #include "scheduling/admission-controller.h"
+#include "scheduling/backend-config.h"
 #include "gen-cpp/Types_types.h"  // for TNetworkAddress
 #include "gen-cpp/ResourceBrokerService_types.h"
 #include "rapidjson/rapidjson.h"
@@ -95,55 +96,10 @@ class SimpleScheduler : public Scheduler {
   virtual void HandleLostResource(const TUniqueId& client_resource_id);
 
  private:
-  /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
-  typedef std::string Hostname;
-
-  /// Type to store IPv4 addresses.
-  typedef std::string IpAddr;
-
-  typedef std::list<TBackendDescriptor> BackendList;
-
-  /// Map from a host's IP address to a list of backends running on that node.
-  typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
-
   /// Map from a host's IP address to the next backend to be round-robin scheduled for
   /// that host (needed for setups with multiple backends on a single host)
-  typedef boost::unordered_map<IpAddr, BackendList::const_iterator> NextBackendPerHost;
-
-  /// Map from a hostname to its IP address to support hostname based backend lookup.
-  typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
-
-  /// Configuration class to store a list of backends per IP address and a mapping from
-  /// hostnames to IP addresses. backend_ip_map contains entries for all backends in
-  /// backend_map and needs to be updated whenever backend_map changes. Each plan node
-  /// creates a read-only copy of the scheduler's current backend_config_ to use during
-  /// scheduling.
-  class BackendConfig {
-   public:
-    BackendConfig() {}
-
-    /// Construct config from list of backends.
-    BackendConfig(const std::vector<TNetworkAddress>& backends);
-
-    void AddBackend(const TBackendDescriptor& be_desc);
-    void RemoveBackend(const TBackendDescriptor& be_desc);
-
-    /// Look up the IP address of 'hostname' in the internal backend maps and return
-    /// whether the lookup was successful. If 'hostname' itself is a valid IP address then
-    /// it is copied to 'ip' and true is returned. 'ip' can be NULL if the caller only
-    /// wants to check whether the lookup succeeds. Use this method to resolve datanode
-    /// hostnames to IP addresses during scheduling, to prevent blocking on the OS.
-    bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
-
-    int NumBackends() const { return backend_map().size(); }
-
-    const BackendMap& backend_map() const { return backend_map_; }
-    const BackendIpAddressMap& backend_ip_map() const { return backend_ip_map_; }
-
-   private:
-    BackendMap backend_map_;
-    BackendIpAddressMap backend_ip_map_;
-  };
+  typedef boost::unordered_map<IpAddr, BackendConfig::BackendList::const_iterator>
+      NextBackendPerHost;
 
   typedef std::shared_ptr<const BackendConfig> BackendConfigPtr;
 
@@ -250,7 +206,6 @@ class SimpleScheduler : public Scheduler {
         FragmentScanRangeAssignment* assignment);
 
     const BackendConfig& backend_config() const { return backend_config_; }
-    const BackendMap& backend_map() const { return backend_config_.backend_map(); }
 
     /// Print the assignment and statistics to VLOG_FILE.
     void PrintAssignment(const FragmentScanRangeAssignment& assignment);
@@ -279,7 +234,7 @@ class SimpleScheduler : public Scheduler {
     int first_unused_backend_idx_;
 
     /// Store a random permutation of backend hosts to select backends from.
-    std::vector<const BackendMap::value_type*> random_backend_order_;
+    std::vector<IpAddr> random_backend_order_;
 
     /// Track round robin information per backend host.
     NextBackendPerHost next_backend_per_host_;
@@ -301,7 +256,9 @@ class SimpleScheduler : public Scheduler {
 
   /// The scheduler's backend configuration. When receiving changes to the backend
   /// configuration from the statestore we will make a copy of the stored object, apply
-  /// the updates to the copy and atomically swap the contents of this pointer.
+  /// the updates to the copy and atomically swap the contents of this pointer. Each plan
+  /// node creates a read-only copy of the scheduler's current backend_config_ to use
+  /// during scheduling.
   BackendConfigPtr backend_config_;
 
   /// Protect access to backend_config_ which might otherwise be updated asynchronously
@@ -382,9 +339,6 @@ class SimpleScheduler : public Scheduler {
   BackendConfigPtr GetBackendConfig() const;
   void SetBackendConfig(const BackendConfigPtr& backend_config);
 
-  /// Return a list of all backends registered with the scheduler.
-  void GetAllKnownBackends(BackendList* backends);
-
   /// Add the granted reservation and resources to the active_reservations_ and
   /// active_client_resources_ maps, respectively.
   void AddToActiveResourceMaps(
@@ -517,11 +471,6 @@ class SimpleScheduler : public Scheduler {
   int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
       const TQueryExecRequest& exec_request);
 
-  /// Deterministically resolve a host to one of its IP addresses. This method will call
-  /// into the OS, so it can take a long time to return. Use this method to resolve
-  /// hostnames during initialization and while processing statestore updates.
-  static Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
-
   friend class impala::SchedulerWrapper;
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index bce350d..afa5b32 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -59,16 +59,18 @@ Status GetHostname(string* hostname) {
   return Status::OK();
 }
 
-Status HostnameToIpAddrs(const string& name, vector<string>* addresses) {
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){
+  // Try to resolve via the operating system.
+  vector<IpAddr> addresses;
   addrinfo hints;
   memset(&hints, 0, sizeof(struct addrinfo));
   hints.ai_family = AF_INET; // IPv4 addresses only
   hints.ai_socktype = SOCK_STREAM;
 
   struct addrinfo* addr_info;
-  if (getaddrinfo(name.c_str(), NULL, &hints, &addr_info) != 0) {
+  if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) {
     stringstream ss;
-    ss << "Could not find IPv4 address for: " << name;
+    ss << "Could not find IPv4 address for: " << hostname;
     return Status(ss.str());
   }
 
@@ -79,15 +81,32 @@ Status HostnameToIpAddrs(const string& name, vector<string>*
addresses) {
         inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 64);
     if (result == NULL) {
       stringstream ss;
-      ss << "Could not convert IPv4 address for: " << name;
+      ss << "Could not convert IPv4 address for: " << hostname;
       freeaddrinfo(addr_info);
       return Status(ss.str());
     }
-    addresses->push_back(string(addr_buf));
+    addresses.push_back(string(addr_buf));
     it = it->ai_next;
   }
 
   freeaddrinfo(addr_info);
+
+  if (addresses.empty()) {
+    stringstream ss;
+    ss << "Could not convert IPv4 address for: " << hostname;
+    return Status(ss.str());
+  }
+
+  // RFC 3484 only specifies a partial order for the result of getaddrinfo() so we need to
+  // sort the addresses before picking the first non-localhost one.
+  sort(addresses.begin(), addresses.end());
+
+  // Try to find a non-localhost address, otherwise just use the first IP address
+  // returned.
+  *ip = addresses[0];
+  if (!FindFirstNonLocalhost(addresses, ip)) {
+    VLOG(3) << "Only localhost addresses found for " << hostname;
+  }
   return Status::OK();
 }
 
@@ -128,6 +147,15 @@ TNetworkAddress MakeNetworkAddress(const string& address) {
   return ret;
 }
 
+/// Utility method because Thrift does not supply useful constructors
+TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr&
ip,
+    int port) {
+  TBackendDescriptor be_desc;
+  be_desc.address = MakeNetworkAddress(hostname, port);
+  be_desc.ip_address = ip;
+  return be_desc;
+}
+
 bool IsWildcardAddress(const string& ipaddress) {
   return ipaddress == "0.0.0.0";
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 57e95a1..315d451 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -16,15 +16,23 @@
 // under the License.
 
 #include "common/status.h"
+#include "gen-cpp/StatestoreService_types.h"
 #include "gen-cpp/Types_types.h"
 #include <vector>
 
 namespace impala {
 
-/// Looks up all IP addresses associated with a given hostname. Returns
-/// an error status if any system call failed, otherwise OK. Even if OK
-/// is returned, addresses may still be of zero length.
-Status HostnameToIpAddrs(const std::string& name, std::vector<std::string>* addresses);
+/// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
+typedef std::string Hostname;
+
+/// Type to store IPv4 addresses.
+typedef std::string IpAddr;
+
+/// Looks up all IP addresses associated with a given hostname and returns one of them via
+/// 'address'. If the IP addresses of a host don't change, then subsequent calls will
+/// always return the same address. Returns an error status if any system call failed,
+/// otherwise OK. Even if OK is returned, addresses may still be of zero length.
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
 
 /// Finds the first non-localhost IP address in the given list. Returns
 /// true if such an address was found, false otherwise.
@@ -43,6 +51,10 @@ TNetworkAddress MakeNetworkAddress(const std::string& hostname, int
port);
 /// hostname and a port of 0.
 TNetworkAddress MakeNetworkAddress(const std::string& address);
 
+/// Utility method because Thrift does not supply useful constructors
+TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr&
ip,
+    int port);
+
 /// Returns true if the ip address parameter is the wildcard interface (0.0.0.0)
 bool IsWildcardAddress(const std::string& ipaddress);
 


Mime
View raw message