kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 02/06: [master] introduce cache for location mapping assignments
Date Wed, 20 Mar 2019 03:44:27 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.9.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ddfcb873eb251e87049f0e5b3aabb417bd27c7e7
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Wed Feb 27 21:53:43 2019 -0800

    [master] introduce cache for location mapping assignments
    
    This changelist adds a very primitive cache for location assignments.
    The cache does not prevent running multiple commands for the same
    location key if an entry is not present in the cache.
    
    A unit test is also added.
    
    Change-Id: Icb5c436c9433acd87c44c4d81982420f33ebb4a4
    Reviewed-on: http://gerrit.cloudera.org:8080/12634
    Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
    Tested-by: Kudu Jenkins
    (cherry picked from commit ae6bbcaabd20955119f1d945d276589538dae928)
    Reviewed-on: http://gerrit.cloudera.org:8080/12783
    Reviewed-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/master/CMakeLists.txt         |   2 +
 src/kudu/master/location_cache-test.cc | 179 +++++++++++++++++++++++++++++++++
 src/kudu/master/location_cache.cc      | 152 ++++++++++++++++++++++++++++
 src/kudu/master/location_cache.h       |  88 ++++++++++++++++
 4 files changed, 421 insertions(+)

diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 89fd9c6..79f38ce 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -35,6 +35,7 @@ ADD_EXPORTABLE_LIBRARY(master_proto
 set(MASTER_SRCS
   catalog_manager.cc
   hms_notification_log_listener.cc
+  location_cache.cc
   master.cc
   master_cert_authority.cc
   master_options.cc
@@ -79,6 +80,7 @@ SET_KUDU_TEST_LINK_LIBS(
 
 ADD_KUDU_TEST(catalog_manager-test)
 ADD_KUDU_TEST(hms_notification_log_listener-test)
+ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(master-test RESOURCE_LOCK "master-web-port"
                           DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(mini_master-test RESOURCE_LOCK "master-web-port")
diff --git a/src/kudu/master/location_cache-test.cc b/src/kudu/master/location_cache-test.cc
new file mode 100644
index 0000000..96c1bea
--- /dev/null
+++ b/src/kudu/master/location_cache-test.cc
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/location_cache.h"
+
+#include <cstdint>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+METRIC_DECLARE_counter(location_mapping_cache_hits);
+METRIC_DECLARE_counter(location_mapping_cache_queries);
+
+using std::string;
+using std::thread;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+// Targeted test for LocationCache.
+class LocationCacheTest : public KuduTest {
+ protected:
+  void SetUp() override {
+    KuduTest::SetUp();
+    metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_,
+                                                      "LocationCacheTest");
+  }
+
+  void CheckMetrics(int64_t expected_queries, int64_t expected_hits) {
+    scoped_refptr<Counter> cache_queries(metric_entity_->FindOrCreateCounter(
+        &METRIC_location_mapping_cache_queries));
+    ASSERT_NE(nullptr, cache_queries.get());
+    ASSERT_EQ(expected_queries, cache_queries->value());
+
+    scoped_refptr<Counter> cache_hits(metric_entity_->FindOrCreateCounter(
+        &METRIC_location_mapping_cache_hits));
+    ASSERT_NE(nullptr, cache_hits.get());
+    ASSERT_EQ(expected_hits, cache_hits->value());
+  }
+
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+};
+
+TEST_F(LocationCacheTest, EmptyMappingCommand) {
+  LocationCache cache(" ", metric_entity_.get());
+  string location;
+  auto s = cache.GetLocation("na", &location);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(), "invalid empty location mapping command");
+  NO_FATALS(CheckMetrics(1, 0));
+}
+
+TEST_F(LocationCacheTest, MappingCommandFailureExitStatus) {
+  LocationCache cache("/sbin/nologin", metric_entity_.get());
+  string location;
+  auto s = cache.GetLocation("na", &location);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(), "failed to run location mapping command: ");
+  NO_FATALS(CheckMetrics(1, 0));
+}
+
+TEST_F(LocationCacheTest, MappingCommandEmptyOutput) {
+  LocationCache cache("/bin/cat", metric_entity_.get());
+  string location;
+  auto s = cache.GetLocation("/dev/null", &location);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(), "location mapping command returned invalid empty location");
+  NO_FATALS(CheckMetrics(1, 0));
+}
+
+TEST_F(LocationCacheTest, MappingCommandReturnsInvalidLocation) {
+  const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(),
+                                           "testdata/first_argument.sh");
+  const string location_mapping_cmd = Substitute("$0 invalid.location",
+                                                 cmd_path);
+  LocationCache cache(location_mapping_cmd, metric_entity_.get());
+  string location;
+  auto s = cache.GetLocation("na", &location);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+  ASSERT_STR_CONTAINS(
+      s.ToString(), "location mapping command returned invalid location");
+  NO_FATALS(CheckMetrics(1, 0));
+}
+
+TEST_F(LocationCacheTest, HappyPath) {
+  const string kRefLocation = "/ref_location";
+  const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(),
+                                           "testdata/first_argument.sh");
+  const string location_mapping_cmd = Substitute("$0 $1",
+                                                 cmd_path, kRefLocation);
+  LocationCache cache(location_mapping_cmd, metric_entity_.get());
+  NO_FATALS(CheckMetrics(0, 0));
+
+  string location;
+  auto s = cache.GetLocation("key_0", &location);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(kRefLocation, location);
+  NO_FATALS(CheckMetrics(1, 0));
+
+  s = cache.GetLocation("key_1", &location);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(kRefLocation, location);
+  NO_FATALS(CheckMetrics(2, 0));
+
+  s = cache.GetLocation("key_1", &location);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(kRefLocation, location);
+  NO_FATALS(CheckMetrics(3, 1));
+
+  s = cache.GetLocation("key_0", &location);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(kRefLocation, location);
+  NO_FATALS(CheckMetrics(4, 2));
+}
+
+TEST_F(LocationCacheTest, ConcurrentRequests) {
+  static constexpr auto kNumThreads = 32;
+  const string kRefLocation = "/ref_location";
+  const string cmd_path = JoinPathSegments(GetTestExecutableDirectory(),
+                                           "testdata/first_argument.sh");
+  const string location_mapping_cmd = Substitute("$0 $1",
+                                                 cmd_path, kRefLocation);
+  LocationCache cache(location_mapping_cmd, metric_entity_.get());
+  NO_FATALS(CheckMetrics(0, 0));
+
+  for (auto iter = 0; iter < 2; ++iter) {
+    vector<thread> threads;
+    threads.reserve(kNumThreads);
+    for (auto idx = 0; idx < kNumThreads; ++idx) {
+      threads.emplace_back([&cache, &kRefLocation, idx]() {
+        string location;
+        auto s = cache.GetLocation(Substitute("key_$0", idx), &location);
+        CHECK(s.ok()) << s.ToString();
+        CHECK_EQ(kRefLocation, location);
+      });
+    }
+    for (auto& t : threads) {
+      t.join();
+    }
+    // Expecting to account for every query, and the follow-up iteration
+    // should result in every query hitting the cache.
+    NO_FATALS(CheckMetrics(kNumThreads * (iter + 1),
+                           kNumThreads * iter));
+  }
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/location_cache.cc b/src/kudu/master/location_cache.cc
new file mode 100644
index 0000000..b79b259
--- /dev/null
+++ b/src/kudu/master/location_cache.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/location_cache.h"
+
+#include <cstdio>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/charset.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/trace.h"
+
+METRIC_DEFINE_counter(server, location_mapping_cache_hits,
+                      "Location Mapping Cache Hits",
+                      kudu::MetricUnit::kCacheHits,
+                      "Number of times location mapping assignment used "
+                      "cached data");
+METRIC_DEFINE_counter(server, location_mapping_cache_queries,
+                      "Location Mapping Cache Queries",
+                      kudu::MetricUnit::kCacheQueries,
+                      "Number of queries to the location mapping cache");
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+namespace {
+// Returns if 'location' is a valid location string, i.e. it begins with /
+// and consists of /-separated tokens each of which contains only characters
+// from the set [a-zA-Z0-9_-.].
+bool IsValidLocation(const string& location) {
+  if (location.empty() || location[0] != '/') {
+    return false;
+  }
+  const strings::CharSet charset("abcdefghijklmnopqrstuvwxyz"
+                                 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                 "0123456789"
+                                 "_-./");
+  for (const auto c : location) {
+    if (!charset.Test(c)) {
+      return false;
+    }
+  }
+  return true;
+}
+} // anonymous namespace
+
+LocationCache::LocationCache(string location_mapping_cmd,
+                             MetricEntity* metric_entity)
+    : location_mapping_cmd_(std::move(location_mapping_cmd)) {
+  if (metric_entity != nullptr) {
+    location_mapping_cache_hits_ = metric_entity->FindOrCreateCounter(
+          &METRIC_location_mapping_cache_hits);
+    location_mapping_cache_queries_ = metric_entity->FindOrCreateCounter(
+          &METRIC_location_mapping_cache_queries);
+  }
+}
+
+Status LocationCache::GetLocation(const string& key, string* location) {
+  if (PREDICT_TRUE(location_mapping_cache_queries_)) {
+    location_mapping_cache_queries_->Increment();
+  }
+  {
+    // First check whether the location for the key has already been assigned.
+    shared_lock<rw_spinlock> l(location_map_lock_);
+    const auto* value_ptr = FindOrNull(location_map_, key);
+    if (value_ptr) {
+      DCHECK(!value_ptr->empty());
+      *location = *value_ptr;
+      if (PREDICT_TRUE(location_mapping_cache_hits_)) {
+        location_mapping_cache_hits_->Increment();
+      }
+      return Status::OK();
+    }
+  }
+  string value;
+  TRACE(Substitute("key $0: assigning location", key));
+  Status s = GetLocationFromLocationMappingCmd(
+      location_mapping_cmd_, key, &value);
+  TRACE(Substitute("key $0: assigned location '$1'", key, value));
+  if (s.ok()) {
+    CHECK(!value.empty());
+    std::lock_guard<rw_spinlock> l(location_map_lock_);
+    // This simple implementation doesn't protect against multiple runs of the
+    // location-mapping command for the same key.
+    InsertIfNotPresent(&location_map_, key, value);
+    *location = value;
+  }
+  return s;
+}
+
+Status LocationCache::GetLocationFromLocationMappingCmd(const string& cmd,
+                                                        const string& key,
+                                                        string* location) {
+  DCHECK(location);
+  vector<string> argv = strings::Split(cmd, " ", strings::SkipEmpty());
+  if (argv.empty()) {
+    return Status::RuntimeError("invalid empty location mapping command");
+  }
+  argv.push_back(key);
+  string stderr, location_temp;
+  Status s = Subprocess::Call(argv, /*stdin_in=*/"", &location_temp, &stderr);
+  if (!s.ok()) {
+    return Status::RuntimeError(
+        Substitute("failed to run location mapping command: $0", s.ToString()),
+        stderr);
+  }
+  StripWhiteSpace(&location_temp);
+  // Special case an empty location for a better error.
+  if (location_temp.empty()) {
+    return Status::RuntimeError(
+        "location mapping command returned invalid empty location");
+  }
+  if (!IsValidLocation(location_temp)) {
+    return Status::RuntimeError(
+        "location mapping command returned invalid location",
+        location_temp);
+  }
+  *location = std::move(location_temp);
+  return Status::OK();
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/location_cache.h b/src/kudu/master/location_cache.h
new file mode 100644
index 0000000..b89f2d9
--- /dev/null
+++ b/src/kudu/master/location_cache.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <unordered_map>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace master {
+
+// A primitive cache of unlimited capacity to store assigned locations for
+// a key. The cache entries are kept in the cache for the lifetime of the cache
+// itself.
+class LocationCache {
+ public:
+  // The location assignment command is specified by the 'location_mapping_cmd'
+  // parameter (the command might be a script or an executable). The
+  // 'metric_entity' is used to register standard cache counters: total number
+  // of queries and number of cache hits during the cache's lifetime.
+  explicit LocationCache(std::string location_mapping_cmd,
+                         MetricEntity* metric_entity = nullptr);
+  ~LocationCache() = default;
+
+  // Get the location for the specified key. The key is treated as an opaque
+  // identifier.
+  //
+  // If no cached location is found, the location mapping command is run,
+  // caching the result for the lifetime of the cache.
+  //
+  // This method returns an error if there was an issue running the location
+  // assignment command.
+  Status GetLocation(const std::string& key, std::string* location);
+
+ private:
+  // Resolves an opaque 'key' into a location using the command 'cmd'.
+  // The result will be stored in 'location', which must not be null. If there
+  // is an error running the command or the output is invalid, an error Status
+  // will be returned.
+  //
+  // TODO(wdberkeley): Eventually we may want to get multiple locations at once
+  // by giving the location mapping command multiple arguments (like Hadoop).
+  static Status GetLocationFromLocationMappingCmd(const std::string& cmd,
+                                                  const std::string& key,
+                                                  std::string* location);
+
+  // The executable to run when assigning locations for keys which are not yet
+  // in the cache.
+  const std::string location_mapping_cmd_;
+
+  // Counter to track cache hits, i.e. when it was not necessary to run
+  // the location assignment command.
+  scoped_refptr<Counter> location_mapping_cache_hits_;
+
+  // Counter to track overall cache queries, i.e. hits plus misses. Every miss
+  // results in the location assignment command being run.
+  scoped_refptr<Counter> location_mapping_cache_queries_;
+
+  // Spinlock to protect the location assignment map (location_map_).
+  rw_spinlock location_map_lock_;
+
+  // The location assignment map: dictionary of key --> location.
+  std::unordered_map<std::string, std::string> location_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(LocationCache);
+};
+
+} // namespace master
+} // namespace kudu


Mime
View raw message