impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [06/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8
Date Sat, 17 Jun 2017 07:25:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rolling_log.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log.cc b/be/src/kudu/util/rolling_log.cc
new file mode 100644
index 0000000..486abc0
--- /dev/null
+++ b/be/src/kudu/util/rolling_log.cc
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rolling_log.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+
+#include <iomanip>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <zlib.h>
+
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/user.h"
+
+using std::ostringstream;
+using std::setw;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+static const int kDefaultSizeLimitBytes = 64 * 1024 * 1024; // 64MB
+
+namespace kudu {
+
+RollingLog::RollingLog(Env* env, string log_dir, string log_name)
+    : env_(env),
+      log_dir_(std::move(log_dir)),
+      log_name_(std::move(log_name)),
+      size_limit_bytes_(kDefaultSizeLimitBytes),
+      compress_after_close_(true) {}
+
+RollingLog::~RollingLog() {
+  WARN_NOT_OK(Close(), "Unable to close RollingLog");
+}
+
+void RollingLog::SetSizeLimitBytes(int64_t size) {
+  CHECK_GT(size, 0);
+  size_limit_bytes_ = size;
+}
+
+void RollingLog::SetCompressionEnabled(bool compress) {
+  compress_after_close_ = compress;
+}
+
+string RollingLog::GetLogFileName(int sequence) const {
+  ostringstream str;
+
+  // 1. Program name.
+  str << google::ProgramInvocationShortName();
+
+  // 2. Host name.
+  string hostname;
+  Status s = GetHostname(&hostname);
+  if (!s.ok()) {
+    hostname = "unknown_host";
+  }
+  str << "." << hostname;
+
+  // 3. User name.
+  string user_name;
+  s = GetLoggedInUser(&user_name);
+  if (!s.ok()) {
+    user_name = "unknown_user";
+  }
+  str << "." << user_name;
+
+  // 4. Log name.
+  str << "." << log_name_;
+
+  // 5. Timestamp.
+  // Implementation cribbed from glog/logging.cc
+  time_t time = static_cast<time_t>(WallTime_Now());
+  struct ::tm tm_time;
+  localtime_r(&time, &tm_time);
+
+  str << ".";
+  str.fill('0');
+  str << 1900+tm_time.tm_year
+      << setw(2) << 1+tm_time.tm_mon
+      << setw(2) << tm_time.tm_mday
+      << '-'
+      << setw(2) << tm_time.tm_hour
+      << setw(2) << tm_time.tm_min
+      << setw(2) << tm_time.tm_sec;
+  str.clear(); // resets formatting flags
+
+  // 6. Sequence number.
+  str << "." << sequence;
+
+  // 7. Pid.
+  str << "." << getpid();
+
+  return str.str();
+}
+
+Status RollingLog::Open() {
+  CHECK(!file_);
+
+  for (int sequence = 0; ; sequence++) {
+
+    string path = JoinPathSegments(log_dir_,
+                                   GetLogFileName(sequence));
+
+    WritableFileOptions opts;
+    // Logs aren't worth the performance cost of durability.
+    opts.sync_on_close = false;
+    opts.mode = Env::CREATE_NON_EXISTING;
+
+    Status s = env_->NewWritableFile(opts, path, &file_);
+    if (s.IsAlreadyPresent()) {
+      // We already rolled once at this same timestamp.
+      // Try again with a new sequence number.
+      continue;
+    }
+    RETURN_NOT_OK(s);
+
+    VLOG(1) << "Rolled " << log_name_ << " log to new file: " << path;
+    break;
+  }
+  return Status::OK();
+}
+
+Status RollingLog::Close() {
+  if (!file_) {
+    return Status::OK();
+  }
+  string path = file_->filename();
+  RETURN_NOT_OK_PREPEND(file_->Close(),
+                        Substitute("Unable to close $0", path));
+  file_.reset();
+  if (compress_after_close_) {
+    WARN_NOT_OK(CompressFile(path), "Unable to compress old log file");
+  }
+  return Status::OK();
+}
+
+Status RollingLog::Append(StringPiece s) {
+  if (!file_) {
+    RETURN_NOT_OK_PREPEND(Open(), "Unable to open log");
+  }
+
+  if (file_->Size() + s.size() > size_limit_bytes_) {
+    RETURN_NOT_OK_PREPEND(Close(), "Unable to roll log");
+    RETURN_NOT_OK_PREPEND(Open(), "Unable to roll log");
+  }
+  RETURN_NOT_OK(file_->Append(s));
+  return Status::OK();
+}
+
+namespace {
+
+Status GzClose(gzFile f) {
+  int err = gzclose(f);
+  switch (err) {
+    case Z_OK:
+      return Status::OK();
+    case Z_STREAM_ERROR:
+      return Status::InvalidArgument("Stream not valid");
+    case Z_ERRNO:
+      return Status::IOError("IO Error closing stream");
+    case Z_MEM_ERROR:
+      return Status::RuntimeError("Out of memory");
+    case Z_BUF_ERROR:
+      return Status::IOError("read ended in the middle of a stream");
+    default:
+      return Status::IOError("Unknown zlib error", SimpleItoa(err));
+  }
+}
+
+class ScopedGzipCloser {
+ public:
+  explicit ScopedGzipCloser(gzFile f)
+    : file_(f) {
+  }
+
+  ~ScopedGzipCloser() {
+    if (file_) {
+      WARN_NOT_OK(GzClose(file_), "Unable to close gzip stream");
+    }
+  }
+
+  void Cancel() {
+    file_ = nullptr;
+  }
+
+ private:
+  gzFile file_;
+};
+} // anonymous namespace
+
+// We implement CompressFile() manually using zlib APIs rather than forking
+// out to '/bin/gzip' since fork() can be expensive on processes that use a large
+// amount of memory. During the time of the fork, other threads could end up
+// blocked. Implementing it using the zlib stream APIs isn't too much code
+// and is less likely to be problematic.
+Status RollingLog::CompressFile(const std::string& path) const {
+  unique_ptr<SequentialFile> in_file;
+  RETURN_NOT_OK_PREPEND(env_->NewSequentialFile(path, &in_file),
+                        "Unable to open input file to compress");
+
+  string gz_path = path + ".gz";
+  gzFile gzf = gzopen(gz_path.c_str(), "w");
+  if (!gzf) {
+    return Status::IOError("Unable to open gzip stream");
+  }
+
+  ScopedGzipCloser closer(gzf);
+
+  // Loop reading data from the input file and writing to the gzip stream.
+  uint8_t buf[32 * 1024];
+  while (true) {
+    Slice result(buf, arraysize(buf));
+    RETURN_NOT_OK_PREPEND(in_file->Read(&result),
+                          "Unable to read from gzip input");
+    if (result.size() == 0) {
+      break;
+    }
+    int n = gzwrite(gzf, result.data(), result.size());
+    if (n == 0) {
+      int errnum;
+      return Status::IOError("Unable to write to gzip output",
+                             gzerror(gzf, &errnum));
+    }
+  }
+  closer.Cancel();
+  RETURN_NOT_OK_PREPEND(GzClose(gzf),
+                        "Unable to close gzip output");
+
+  WARN_NOT_OK(env_->DeleteFile(path),
+              "Unable to delete gzip input file after compression");
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rolling_log.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log.h b/be/src/kudu/util/rolling_log.h
new file mode 100644
index 0000000..2559d6d
--- /dev/null
+++ b/be/src/kudu/util/rolling_log.h
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_ROLLING_LOG_H
+#define KUDU_UTIL_ROLLING_LOG_H
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+class WritableFile;
+
+// A simple rolling log.
+//
+// This creates a log which spans multiple files in a specified directory.
+// After a log file reaches a specified size threshold, it automatically rolls
+// to the next file in the sequence.
+//
+// The files are named similarly to glog log files and use the following pattern:
+//
+// <log_dir>/<program-name>.<hostname>.<user-name>.<log-name>.<timestamp>.<sequence>.<pid>
+//   log_dir:      the log_dir specified in the constructor
+//   program-name: argv[0], as determined by google::ProgramInvocationShortName()
+//   hostname:     the local machine hostname
+//   user-name:    the current user name
+//   log-name:     the log_name specified in the constructor
+//   timestamp:    the wall clock time when the log file was created, in
+//                 YYYYmmdd-HHMMSS fixed-length format.
+//   sequence:     a sequence number which is used to disambiguate when the log file is
+//                 rolled multiple times within a second
+//   pid:          the pid of the daemon
+//
+// The log implementation does not ensure durability of the log or its files in any way.
+// This class is not thread-safe and must be externally synchronized.
+class RollingLog {
+ public:
+  RollingLog(Env* env, std::string log_dir, std::string log_name);
+
+  ~RollingLog();
+
+  // Open the log.
+  // It is optional to call this function. Append() will automatically open
+  // the log as necessary if it is not open.
+  Status Open();
+
+  // Set the size limit for the current and any future log files.
+  //
+  // There is no limit on the total number of previous log segments. We rely
+  // on system utilities to clean up old logs to maintain some size limit.
+  void SetSizeLimitBytes(int64_t bytes);
+
+  // If compression is enabled, log files are compressed.
+  // NOTE: this requires that the passed-in Env instance is the local file system.
+  void SetCompressionEnabled(bool compress);
+
+  // Append the given data to the current log file.
+  //
+  // If appending this data would cross the configured file size limit, a new file
+  // is created and the data is appended there.
+  //
+  // Note that this is a synchronous API and causes potentially-blocking IO on the
+  // current thread. However, this does not fsync() or otherwise ensure durability
+  // of the appended data.
+  Status Append(StringPiece data);
+
+  // Close the log.
+  Status Close();
+
+ private:
+  std::string GetLogFileName(int sequence) const;
+
+  // Compress the given path, writing a new file '<path>.gz'.
+  Status CompressFile(const std::string& path) const;
+
+  Env* const env_;
+  const std::string log_dir_;
+  const std::string log_name_;
+
+  int64_t size_limit_bytes_;
+
+  std::unique_ptr<WritableFile> file_;
+  bool compress_after_close_;
+
+  DISALLOW_COPY_AND_ASSIGN(RollingLog);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_ROLLING_LOG_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_mutex-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex-test.cc b/be/src/kudu/util/rw_mutex-test.cc
new file mode 100644
index 0000000..7f9e376
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex-test.cc
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/test_util.h"
+
+using std::lock_guard;
+using std::thread;
+using std::try_to_lock;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+class RWMutexTest : public KuduTest,
+                    public ::testing::WithParamInterface<RWMutex::Priority> {
+ public:
+  RWMutexTest()
+     : lock_(GetParam()) {
+  }
+ protected:
+  RWMutex lock_;
+};
+
+// Instantiate every test for each kind of RWMutex priority.
+INSTANTIATE_TEST_CASE_P(Priorities, RWMutexTest,
+                        ::testing::Values(RWMutex::Priority::PREFER_READING,
+                                          RWMutex::Priority::PREFER_WRITING));
+
+// Multi-threaded test that tries to find deadlocks in the RWMutex wrapper.
+TEST_P(RWMutexTest, TestDeadlocks) {
+  uint64_t number_of_writes = 0;
+  AtomicInt<uint64_t> number_of_reads(0);
+
+  AtomicBool done(false);
+  vector<thread> threads;
+
+  // Start several blocking and non-blocking read-write workloads.
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        lock_guard<RWMutex> l(lock_);
+        number_of_writes++;
+      }
+    });
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        unique_lock<RWMutex> l(lock_, try_to_lock);
+        if (l.owns_lock()) {
+          number_of_writes++;
+        }
+      }
+    });
+  }
+
+  // Start several blocking and non-blocking read-only workloads.
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        shared_lock<RWMutex> l(lock_);
+        number_of_reads.Increment();
+      }
+    });
+    threads.emplace_back([&](){
+      while (!done.Load()) {
+        shared_lock<RWMutex> l(lock_, try_to_lock);
+        if (l.owns_lock()) {
+          number_of_reads.Increment();
+        }
+      }
+    });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(1));
+  done.Store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  shared_lock<RWMutex> l(lock_);
+  LOG(INFO) << "Number of writes: " << number_of_writes;
+  LOG(INFO) << "Number of reads: " << number_of_reads.Load();
+}
+
+#ifndef NDEBUG
+// Tests that the RWMutex wrapper catches basic usage errors. This checking is
+// only enabled in debug builds.
+TEST_P(RWMutexTest, TestLockChecking) {
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.ReadLock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    CHECK(lock_.TryReadLock());
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.WriteLock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    CHECK(lock_.TryWriteLock());
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.ReadLock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    CHECK(lock_.TryReadLock());
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.WriteLock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    CHECK(lock_.TryWriteLock());
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.ReadUnlock();
+  }, "wasn't holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteUnlock();
+  }, "wasn't holding lock for writing");
+
+  EXPECT_DEATH({
+    lock_.ReadLock();
+    lock_.WriteUnlock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryReadLock());
+    lock_.WriteUnlock();
+  }, "already holding lock for reading");
+
+  EXPECT_DEATH({
+    lock_.WriteLock();
+    lock_.ReadUnlock();
+  }, "already holding lock for writing");
+
+  EXPECT_DEATH({
+    CHECK(lock_.TryWriteLock());
+    lock_.ReadUnlock();
+  }, "already holding lock for writing");
+}
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_mutex.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex.cc b/be/src/kudu/util/rw_mutex.cc
new file mode 100644
index 0000000..6dd8f3e
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex.cc
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rw_mutex.h"
+
+#include <glog/logging.h>
+#include <mutex>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/env.h"
+
+using std::lock_guard;
+
+namespace {
+
+void unlock_rwlock(pthread_rwlock_t* rwlock) {
+  int rv = pthread_rwlock_unlock(rwlock);
+  DCHECK_EQ(0, rv) << strerror(rv);
+}
+
+} // anonymous namespace
+
+namespace kudu {
+
+RWMutex::RWMutex()
+#ifndef NDEBUG
+    : writer_tid_(0)
+#endif
+{
+  Init(Priority::PREFER_READING);
+}
+
+RWMutex::RWMutex(Priority prio)
+#ifndef NDEBUG
+    : writer_tid_(0)
+#endif
+{
+  Init(prio);
+}
+
+void RWMutex::Init(Priority prio) {
+#ifdef __linux__
+  // Adapt from priority to the pthread type.
+  int kind = PTHREAD_RWLOCK_PREFER_READER_NP;
+  switch (prio) {
+    case Priority::PREFER_READING:
+      kind = PTHREAD_RWLOCK_PREFER_READER_NP;
+      break;
+    case Priority::PREFER_WRITING:
+      kind = PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;
+      break;
+  }
+
+  // Initialize the new rwlock with the user's preference.
+  pthread_rwlockattr_t attr;
+  int rv = pthread_rwlockattr_init(&attr);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  rv = pthread_rwlockattr_setkind_np(&attr, kind);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  rv = pthread_rwlock_init(&native_handle_, &attr);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  rv = pthread_rwlockattr_destroy(&attr);
+  DCHECK_EQ(0, rv) << strerror(rv);
+#else
+  int rv = pthread_rwlock_init(&native_handle_, NULL);
+  DCHECK_EQ(0, rv) << strerror(rv);
+#endif
+}
+
+RWMutex::~RWMutex() {
+  int rv = pthread_rwlock_destroy(&native_handle_);
+  DCHECK_EQ(0, rv) << strerror(rv);
+}
+
+void RWMutex::ReadLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_rdlock(&native_handle_);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForReading();
+}
+
+void RWMutex::ReadUnlock() {
+  CheckLockState(LockState::READER);
+  UnmarkForReading();
+  unlock_rwlock(&native_handle_);
+}
+
+bool RWMutex::TryReadLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_tryrdlock(&native_handle_);
+  if (rv == EBUSY) {
+    return false;
+  }
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForReading();
+  return true;
+}
+
+void RWMutex::WriteLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_wrlock(&native_handle_);
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForWriting();
+}
+
+void RWMutex::WriteUnlock() {
+  CheckLockState(LockState::WRITER);
+  UnmarkForWriting();
+  unlock_rwlock(&native_handle_);
+}
+
+bool RWMutex::TryWriteLock() {
+  CheckLockState(LockState::NEITHER);
+  int rv = pthread_rwlock_trywrlock(&native_handle_);
+  if (rv == EBUSY) {
+    return false;
+  }
+  DCHECK_EQ(0, rv) << strerror(rv);
+  MarkForWriting();
+  return true;
+}
+
+#ifndef NDEBUG
+
+void RWMutex::AssertAcquiredForReading() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK(ContainsKey(reader_tids_, Env::Default()->gettid()));
+}
+
+void RWMutex::AssertAcquiredForWriting() const {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  CHECK_EQ(Env::Default()->gettid(), writer_tid_);
+}
+
+void RWMutex::CheckLockState(LockState state) const {
+  pid_t my_tid = Env::Default()->gettid();
+  bool is_reader;
+  bool is_writer;
+  {
+    lock_guard<simple_spinlock> l(tid_lock_);
+    is_reader = ContainsKey(reader_tids_, my_tid);
+    is_writer = writer_tid_ == my_tid;
+  }
+
+  switch (state) {
+    case LockState::NEITHER:
+      CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+      CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+      break;
+    case LockState::READER:
+      CHECK(!is_writer) << "Invalid state, already holding lock for writing";
+      CHECK(is_reader) << "Invalid state, wasn't holding lock for reading";
+      break;
+    case LockState::WRITER:
+      CHECK(!is_reader) << "Invalid state, already holding lock for reading";
+      CHECK(is_writer) << "Invalid state, wasn't holding lock for writing";
+      break;
+  }
+}
+
+void RWMutex::MarkForReading() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  reader_tids_.insert(Env::Default()->gettid());
+}
+
+void RWMutex::MarkForWriting() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  writer_tid_ = Env::Default()->gettid();
+}
+
+void RWMutex::UnmarkForReading() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  reader_tids_.erase(Env::Default()->gettid());
+}
+
+void RWMutex::UnmarkForWriting() {
+  lock_guard<simple_spinlock> l(tid_lock_);
+  writer_tid_ = 0;
+}
+
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_mutex.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_mutex.h b/be/src/kudu/util/rw_mutex.h
new file mode 100644
index 0000000..969f2be
--- /dev/null
+++ b/be/src/kudu/util/rw_mutex.h
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <pthread.h>
+#include <unordered_set>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+// Read/write mutex. Implemented as a thin wrapper around pthread_rwlock_t.
+//
+// Although pthread_rwlock_t allows recursive acquisition, this wrapper does
+// not, and will crash in debug mode if recursive acquisition is detected.
+class RWMutex {
+ public:
+
+  // Possible fairness policies for the RWMutex.
+  enum class Priority {
+    // The lock will prioritize readers at the expense of writers.
+    PREFER_READING,
+
+    // The lock will prioritize writers at the expense of readers.
+    //
+    // Care should be taken when using this fairness policy, as it can lead to
+    // unexpected deadlocks (e.g. a writer waiting on the lock will prevent
+    // additional readers from acquiring it).
+    PREFER_WRITING,
+  };
+
+  // Create an RWMutex that prioritizes readers.
+  RWMutex();
+
+  // Create an RWMutex with customized priority. This is a best effort; the
+  // underlying platform may not support custom priorities.
+  explicit RWMutex(Priority prio);
+
+  ~RWMutex();
+
+  void ReadLock();
+  void ReadUnlock();
+  bool TryReadLock();
+
+  void WriteLock();
+  void WriteUnlock();
+  bool TryWriteLock();
+
+#ifndef NDEBUG
+  void AssertAcquiredForReading() const;
+  void AssertAcquiredForWriting() const;
+#else
+  void AssertAcquiredForReading() const {}
+  void AssertAcquiredForWriting() const {}
+#endif
+
+  // Aliases for use with std::lock_guard and kudu::shared_lock.
+  void lock() { WriteLock(); }
+  void unlock() { WriteUnlock(); }
+  bool try_lock() { return TryWriteLock(); }
+  void lock_shared() { ReadLock(); }
+  void unlock_shared() { ReadUnlock(); }
+  bool try_lock_shared() { return TryReadLock(); }
+
+ private:
+  void Init(Priority prio);
+
+  enum class LockState {
+    NEITHER,
+    READER,
+    WRITER,
+  };
+#ifndef NDEBUG
+  void CheckLockState(LockState state) const;
+  void MarkForReading();
+  void MarkForWriting();
+  void UnmarkForReading();
+  void UnmarkForWriting();
+#else
+  void CheckLockState(LockState state) const {}
+  void MarkForReading() {}
+  void MarkForWriting() {}
+  void UnmarkForReading() {}
+  void UnmarkForWriting() {}
+#endif
+
+  pthread_rwlock_t native_handle_;
+
+#ifndef NDEBUG
+  // Protects reader_tids_ and writer_tid_.
+  mutable simple_spinlock tid_lock_;
+
+  // Tracks all current readers by tid.
+  std::unordered_set<pid_t> reader_tids_;
+
+  // Tracks the current writer (if one exists) by tid.
+  pid_t writer_tid_;
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(RWMutex);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_semaphore-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_semaphore-test.cc b/be/src/kudu/util/rw_semaphore-test.cc
new file mode 100644
index 0000000..d12fda1
--- /dev/null
+++ b/be/src/kudu/util/rw_semaphore-test.cc
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/rw_semaphore.h"
+
+using std::thread;
+using std::vector;
+
+namespace kudu {
+struct SharedState {
+  SharedState() : done(false), int_var(0) {}
+
+  bool done;
+  int64_t int_var;
+  rw_semaphore sem;
+};
+
+// Thread which increases the value in the shared state under the write lock.
+void Writer(SharedState* state) {
+  int i = 0;
+  while (true) {
+    std::lock_guard<rw_semaphore> l(state->sem);
+    state->int_var += (i++);
+    if (state->done) {
+      break;
+    }
+  }
+}
+
+// Thread which verifies that the value in the shared state only increases.
+void Reader(SharedState* state) {
+  int prev_val = 0;
+  while (true) {
+    shared_lock<rw_semaphore> l(state->sem);
+    // The int var should only be seen to increase.
+    CHECK_GE(state->int_var, prev_val);
+    prev_val = state->int_var;
+    if (state->done) {
+      break;
+    }
+  }
+}
+
+// Test which verifies basic functionality of the semaphore.
+// When run under TSAN this also verifies the barriers.
+TEST(RWSemaphoreTest, TestBasicOperation) {
+  SharedState s;
+  vector<thread*> threads;
+  // Start 5 readers and writers.
+  for (int i = 0; i < 5; i++) {
+    threads.push_back(new thread(Reader, &s));
+    threads.push_back(new thread(Writer, &s));
+  }
+
+  // Let them contend for a short amount of time.
+  SleepFor(MonoDelta::FromMilliseconds(50));
+
+  // Signal them to stop.
+  {
+    std::lock_guard<rw_semaphore> l(s.sem);
+    s.done = true;
+  }
+
+  for (thread* t : threads) {
+    t->join();
+    delete t;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_semaphore.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rw_semaphore.h b/be/src/kudu/util/rw_semaphore.h
new file mode 100644
index 0000000..de0bb59
--- /dev/null
+++ b/be/src/kudu/util/rw_semaphore.h
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_RW_SEMAPHORE_H
+#define KUDU_UTIL_RW_SEMAPHORE_H
+
+#include <boost/smart_ptr/detail/yield_k.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/debug-util.h"
+
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+// Read-Write semaphore. 32bit uint that contains the number of readers.
+// When someone wants to write, tries to set the 32bit, and waits until
+// the readers have finished. Readers are spinning while the write flag is set.
+//
+// This rw-semaphore makes no attempt at fairness, though it does avoid write
+// starvation (no new readers may obtain the lock if a write is waiting).
+//
+// NOTE: this means that it is not safe to reentrantly acquire the read lock,
+// due to the following deadlock:
+//   - T1: acquire read lock
+//   - T2: wait for write lock
+//     (blocks waiting for readers)
+//   - T1: try to acquire read-lock reentrantly
+//     (blocks to avoid starving writers)
+//
+// Given that this is currently based only on spinning (and not futex),
+// it should only be used in cases where the lock is held for very short
+// time intervals.
+//
+// If the semaphore is expected to always be released from the same thread
+// that acquired it, use rw_spinlock instead.
+//
+// In order to support easier debugging of leaked locks, this class can track
+// the stack trace of the last thread to lock it in write mode. To do so,
+// uncomment the following define:
+//   #define RW_SEMAPHORE_TRACK_HOLDER 1
+// ... and then in gdb, print the contents of the semaphore, and you should
+// see the collected stack trace.
+class rw_semaphore {
+ public:
+  rw_semaphore() : state_(0) {
+  }
+  ~rw_semaphore() {}
+
+  void lock_shared() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      Atomic32 expected = cur_state & kNumReadersMask;   // I expect no write lock
+      Atomic32 try_new_state = expected + 1;          // Add me as reader
+      cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+  void unlock_shared() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      DCHECK_GT(cur_state & kNumReadersMask, 0)
+        << "unlock_shared() called when there are no shared locks held";
+      Atomic32 expected = cur_state;           // I expect a write lock and other readers
+      Atomic32 try_new_state = expected - 1;   // Drop me as reader
+      cur_state = base::subtle::Release_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+  // Tries to acquire a write lock, if no one else has it.
+  // This function retries on CAS failure and waits for readers to complete.
+  bool try_lock() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      // someone else has already the write lock
+      if (cur_state & kWriteFlag)
+        return false;
+
+      Atomic32 expected = cur_state & kNumReadersMask;   // I expect some 0+ readers
+      Atomic32 try_new_state = kWriteFlag | expected;    // I want to lock the other writers
+      cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+
+    WaitPendingReaders();
+    RecordLockHolderStack();
+    return true;
+  }
+
+  void lock() {
+    int loop_count = 0;
+    Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_);
+    while (true) {
+      Atomic32 expected = cur_state & kNumReadersMask;   // I expect some 0+ readers
+      Atomic32 try_new_state = kWriteFlag | expected;    // I want to lock the other writers
+      // Note: we use NoBarrier here because we'll do the Acquire barrier down below
+      // in WaitPendingReaders
+      cur_state = base::subtle::NoBarrier_CompareAndSwap(&state_, expected, try_new_state);
+      if (cur_state == expected)
+        break;
+      // Either was already locked by someone else, or CAS failed.
+      boost::detail::yield(loop_count++);
+    }
+
+    WaitPendingReaders();
+
+#ifndef NDEBUG
+    writer_tid_ = Thread::CurrentThreadId();
+#endif // NDEBUG
+    RecordLockHolderStack();
+  }
+
+  void unlock() {
+    // I expect to be the only writer
+    DCHECK_EQ(base::subtle::NoBarrier_Load(&state_), kWriteFlag);
+
+#ifndef NDEBUG
+    writer_tid_ = -1; // Invalid tid.
+#endif // NDEBUG
+
+    ResetLockHolderStack();
+    // Reset: no writers & no readers.
+    Release_Store(&state_, 0);
+  }
+
+  // Return true if the lock is currently held for write by any thread.
+  // See simple_semaphore::is_locked() for details about where this is useful.
+  bool is_write_locked() const {
+    return base::subtle::NoBarrier_Load(&state_) & kWriteFlag;
+  }
+
+  // Return true if the lock is currently held, either for read or write
+  // by any thread.
+  // See simple_semaphore::is_locked() for details about where this is useful.
+  bool is_locked() const {
+    return base::subtle::NoBarrier_Load(&state_);
+  }
+
+ private:
+  static const uint32_t kNumReadersMask = 0x7fffffff;
+  static const uint32_t kWriteFlag = 1 << 31;
+
+#ifdef RW_SEMAPHORE_TRACK_HOLDER
+  StackTrace writer_stack_;
+  void RecordLockHolderStack() {
+    writer_stack_.Collect();
+  }
+  void ResetLockHolderStack() {
+    writer_stack_.Reset();
+  }
+#else
+  void RecordLockHolderStack() {
+  }
+  void ResetLockHolderStack() {
+  }
+#endif
+
+  void WaitPendingReaders() {
+    int loop_count = 0;
+    while ((base::subtle::Acquire_Load(&state_) & kNumReadersMask) > 0) {
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+ private:
+  volatile Atomic32 state_;
+#ifndef NDEBUG
+  int64_t writer_tid_;
+#endif // NDEBUG
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_RW_SEMAPHORE_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rwc_lock-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock-test.cc b/be/src/kudu/util/rwc_lock-test.cc
new file mode 100644
index 0000000..99c8071
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock-test.cc
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/rwc_lock.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+using base::subtle::NoBarrier_Load;
+using base::subtle::Release_Store;
+using std::string;
+using std::thread;
+using std::vector;
+
+class RWCLockTest : public KuduTest {};
+
+// Holds counters of how many threads hold the lock in each of the
+// provided modes.
+struct LockHoldersCount {
+  LockHoldersCount()
+    : num_readers(0),
+      num_writers(0),
+      num_committers(0) {
+  }
+
+  // Check the invariants of the lock counts.
+  void CheckInvariants() {
+    // At no time should we have more than one writer or committer.
+    CHECK_LE(num_writers, 1);
+    CHECK_LE(num_committers, 1);
+
+    // If we have any readers, then we should not have any committers.
+    if (num_readers > 0) {
+      CHECK_EQ(num_committers, 0);
+    }
+  }
+
+  void AdjustReaders(int delta) {
+    std::lock_guard<simple_spinlock> l(lock);
+    num_readers += delta;
+    CheckInvariants();
+  }
+
+  void AdjustWriters(int delta) {
+    std::lock_guard<simple_spinlock> l(lock);
+    num_writers += delta;
+    CheckInvariants();
+  }
+
+  void AdjustCommitters(int delta) {
+    std::lock_guard<simple_spinlock> l(lock);
+    num_committers += delta;
+    CheckInvariants();
+  }
+
+  int num_readers;
+  int num_writers;
+  int num_committers;
+  simple_spinlock lock;
+};
+
+struct SharedState {
+  LockHoldersCount counts;
+  RWCLock rwc_lock;
+  Atomic32 stop;
+};
+
+void ReaderThread(SharedState* state) {
+  while (!NoBarrier_Load(&state->stop)) {
+    state->rwc_lock.ReadLock();
+    state->counts.AdjustReaders(1);
+    state->counts.AdjustReaders(-1);
+    state->rwc_lock.ReadUnlock();
+  }
+}
+
+void WriterThread(SharedState* state) {
+  string local_str;
+  while (!NoBarrier_Load(&state->stop)) {
+    state->rwc_lock.WriteLock();
+    state->counts.AdjustWriters(1);
+
+    state->rwc_lock.UpgradeToCommitLock();
+    state->counts.AdjustWriters(-1);
+    state->counts.AdjustCommitters(1);
+
+    state->counts.AdjustCommitters(-1);
+    state->rwc_lock.CommitUnlock();
+  }
+}
+
+
+TEST_F(RWCLockTest, TestCorrectBehavior) {
+  SharedState state;
+  Release_Store(&state.stop, 0);
+
+  vector<thread> threads;
+
+  const int kNumWriters = 5;
+  const int kNumReaders = 5;
+
+  for (int i = 0; i < kNumWriters; i++) {
+    threads.emplace_back(WriterThread, &state);
+  }
+  for (int i = 0; i < kNumReaders; i++) {
+    threads.emplace_back(ReaderThread, &state);
+  }
+
+  if (AllowSlowTests()) {
+    SleepFor(MonoDelta::FromSeconds(1));
+  } else {
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+
+  Release_Store(&state.stop, 1);
+
+  for (thread& t : threads) {
+    t.join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rwc_lock.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock.cc b/be/src/kudu/util/rwc_lock.cc
new file mode 100644
index 0000000..efe3ccc
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock.cc
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rwc_lock.h"
+
+#include <glog/logging.h>
+
+#ifndef NDEBUG
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/thread.h"
+#endif // NDEBUG
+
+namespace kudu {
+
+RWCLock::RWCLock()
+  : no_mutators_(&lock_),
+    no_readers_(&lock_),
+    reader_count_(0),
+#ifdef NDEBUG
+    write_locked_(false) {
+#else
+    write_locked_(false),
+    last_writer_tid_(0),
+    last_writelock_acquire_time_(0) {
+  last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+}
+
+RWCLock::~RWCLock() {
+  CHECK_EQ(reader_count_, 0);
+}
+
+void RWCLock::ReadLock() {
+  MutexLock l(lock_);
+  reader_count_++;
+}
+
+void RWCLock::ReadUnlock() {
+  MutexLock l(lock_);
+  DCHECK_GT(reader_count_, 0);
+  reader_count_--;
+  if (reader_count_ == 0) {
+    no_readers_.Signal();
+  }
+}
+
+bool RWCLock::HasReaders() const {
+  MutexLock l(lock_);
+  return reader_count_ > 0;
+}
+
+bool RWCLock::HasWriteLock() const {
+  MutexLock l(lock_);
+#ifndef NDEBUG
+  return last_writer_tid_ == Thread::CurrentThreadId();
+#else
+  return write_locked_;
+#endif
+}
+
+void RWCLock::WriteLock() {
+  MutexLock l(lock_);
+  // Wait for any other mutations to finish.
+  while (write_locked_) {
+    no_mutators_.Wait();
+  }
+#ifndef NDEBUG
+  last_writelock_acquire_time_ = GetCurrentTimeMicros();
+  last_writer_tid_ = Thread::CurrentThreadId();
+  HexStackTraceToString(last_writer_backtrace_, kBacktraceBufSize);
+#endif // NDEBUG
+  write_locked_ = true;
+}
+
+void RWCLock::WriteUnlock() {
+  MutexLock l(lock_);
+  DCHECK(write_locked_);
+  write_locked_ = false;
+#ifndef NDEBUG
+  last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+  no_mutators_.Signal();
+}
+
+void RWCLock::UpgradeToCommitLock() {
+  lock_.lock();
+  DCHECK(write_locked_);
+  while (reader_count_ > 0) {
+    no_readers_.Wait();
+  }
+  DCHECK(write_locked_);
+
+  // Leaves the lock held, which prevents any new readers
+  // or writers.
+}
+
+void RWCLock::CommitUnlock() {
+  DCHECK_EQ(0, reader_count_);
+  write_locked_ = false;
+#ifndef NDEBUG
+  last_writer_backtrace_[0] = '\0';
+#endif // NDEBUG
+  no_mutators_.Broadcast();
+  lock_.unlock();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rwc_lock.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rwc_lock.h b/be/src/kudu/util/rwc_lock.h
new file mode 100644
index 0000000..6d4cb70
--- /dev/null
+++ b/be/src/kudu/util/rwc_lock.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_RWC_LOCK_H
+#define KUDU_UTIL_RWC_LOCK_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+// A read-write-commit lock.
+//
+// This lock has three modes: read, write, and commit.
+// The lock compatibility matrix is as follows:
+//
+//           Read    Write    Commit
+//  Read      X        X
+//  Write     X
+//  Commit
+//
+// An 'X' indicates that the two types of locks may be
+// held at the same time.
+//
+// In prose:
+// - Multiple threads may hold the Read lock at the same time.
+// - A single thread may hold the Write lock, potentially at the
+//   same time as any number of readers.
+// - A single thread may hold the Commit lock, but this lock is completely
+//   exclusive (no concurrent readers or writers).
+//
+// A typical use case for this type of lock is when a structure is read often,
+// occasionally updated, and the update operation can take a long time. In this
+// use case, the readers simply use ReadLock() and ReadUnlock(), while the
+// writer uses a copy-on-write technique like:
+//
+//   obj->lock.WriteLock();
+//   // NOTE: cannot safely mutate obj->state directly here, since readers
+//   // may be concurrent! So, we make a local copy to mutate.
+//   my_local_copy = obj->state;
+//   SomeLengthyMutation(my_local_copy);
+//   obj->lock.UpgradeToCommitLock();
+//   obj->state = my_local_copy;
+//   obj->lock.CommitUnlock();
+//
+// This is more efficient than a standard Reader-Writer lock since the lengthy
+// mutation is only protected against other concurrent mutators, and readers
+// may continue to run with no contention.
+//
+// For the common pattern described above, the 'CowObject<>' template class defined
+// in cow_object.h is more convenient than manual locking.
+//
+// NOTE: this implementation currently does not implement any starvation protection
+// or fairness. If the read lock is being constantly acquired (i.e reader count
+// never drops to 0) then UpgradeToCommitLock() may block arbitrarily long.
+class RWCLock {
+ public:
+  RWCLock();
+  ~RWCLock();
+
+  // Acquire the lock in read mode. Upon return, guarantees that:
+  // - Other threads may concurrently hold the lock for Read.
+  // - Either zero or one thread may hold the lock for Write.
+  // - No threads hold the lock for Commit.
+  void ReadLock();
+  void ReadUnlock();
+
+  // Return true if there are any readers currently holding the lock.
+  // Useful for debug assertions.
+  bool HasReaders() const;
+
+  // Return true if the current thread holds the write lock.
+  //
+  // In DEBUG mode this is accurate -- we track the current holder's tid.
+  // In non-DEBUG mode, this may sometimes return true even if another thread
+  // is in fact the holder.
+  // Thus, this is only really useful in the context of a DCHECK assertion.
+  bool HasWriteLock() const;
+
+  // Boost-like wrappers, so boost lock guards work
+  void lock_shared() { ReadLock(); }
+  void unlock_shared() { ReadUnlock(); }
+
+  // Acquire the lock in write mode. Upon return, guarantees that:
+  // - Other threads may concurrently hold the lock for Read.
+  // - No other threads hold the lock for Write or Commit.
+  void WriteLock();
+  void WriteUnlock();
+
+  // Boost-like wrappers
+  void lock() { WriteLock(); }
+  void unlock() { WriteUnlock(); }
+
+  // Upgrade the lock from Write mode to Commit mode.
+  // Requires that the current thread holds the lock in Write mode.
+  // Upon return, guarantees:
+  // - No other thread holds the lock in any mode.
+  void UpgradeToCommitLock();
+  void CommitUnlock();
+
+ private:
+  // Lock which protects reader_count_ and write_locked_.
+  // Additionally, while the commit lock is held, the
+  // locking thread holds this mutex, which prevents any new
+  // threads from obtaining the lock in any mode.
+  mutable Mutex lock_;
+  ConditionVariable no_mutators_, no_readers_;
+  int reader_count_;
+  bool write_locked_;
+
+#ifndef NDEBUG
+  static const int kBacktraceBufSize = 1024;
+  int64_t last_writer_tid_;
+  int64_t last_writelock_acquire_time_;
+  char last_writer_backtrace_[kBacktraceBufSize];
+#endif // NDEBUG
+
+  DISALLOW_COPY_AND_ASSIGN(RWCLock);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_RWC_LOCK_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/safe_math-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/safe_math-test.cc b/be/src/kudu/util/safe_math-test.cc
new file mode 100644
index 0000000..d3a81c6
--- /dev/null
+++ b/be/src/kudu/util/safe_math-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdint.h>
+
+// Must come before gtest.h.
+#include "kudu/gutil/mathlimits.h"
+
+#include <gtest/gtest.h>
+#include "kudu/util/safe_math.h"
+
+namespace kudu {
+template<typename T>
+static void DoTest(T a, T b, bool expected) {
+  SCOPED_TRACE(a);
+  SCOPED_TRACE(b);
+  bool overflow = false;
+  T ret = AddWithOverflowCheck(a, b, &overflow);
+  EXPECT_EQ(overflow, expected);
+  if (!overflow) {
+    EXPECT_EQ(ret, a + b);
+  }
+}
+
+TEST(TestSafeMath, TestSignedInts) {
+  // Overflow above max of range.
+  DoTest<int32_t>(MathLimits<int32_t>::kMax - 10, 15, true);
+  DoTest<int32_t>(MathLimits<int32_t>::kMax - 10, 10, false);
+
+  // Underflow around negative
+  DoTest<int32_t>(MathLimits<int32_t>::kMin + 10, -15, true);
+  DoTest<int32_t>(MathLimits<int32_t>::kMin + 10, -5, false);
+
+}
+
+TEST(TestSafeMath, TestUnsignedInts) {
+  // Overflow above max
+  DoTest<uint32_t>(MathLimits<uint32_t>::kMax - 10, 15, true);
+  DoTest<uint32_t>(MathLimits<uint32_t>::kMax - 10, 10, false);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/safe_math.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/safe_math.h b/be/src/kudu/util/safe_math.h
new file mode 100644
index 0000000..4c126dd
--- /dev/null
+++ b/be/src/kudu/util/safe_math.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Inline functions for doing overflow-safe operations on integers.
+// These should be used when doing bounds checks on user-provided data,
+// for example.
+// See also: https://www.securecoding.cert.org/confluence/display/cplusplus/INT32-CPP.+Ensure+that+operations+on+signed+integers+do+not+result+in+overflow
+#ifndef KUDU_UTIL_SAFE_MATH_H
+#define KUDU_UTIL_SAFE_MATH_H
+
+#include "kudu/gutil/mathlimits.h"
+
+namespace kudu {
+
+namespace safe_math_internal {
+
+// Template which is specialized for signed and unsigned types separately.
+template<typename Type, bool is_signed>
+struct WithOverflowCheck {
+};
+
+
+// Specialization for signed types.
+template<typename Type>
+struct WithOverflowCheck<Type, true> {
+  static inline Type Add(Type a, Type b, bool *overflowed) {
+    // Implementation from the CERT article referenced in the file header.
+    *overflowed = (((a > 0) && (b > 0) && (a > (MathLimits<Type>::kMax - b))) ||
+                   ((a < 0) && (b < 0) && (a < (MathLimits<Type>::kMin - b))));
+    return a + b;
+  }
+};
+
+// Specialization for unsigned types.
+template<typename Type>
+struct WithOverflowCheck<Type, false> {
+  static inline Type Add(Type a, Type b, bool *overflowed) {
+    Type ret = a + b;
+    *overflowed = ret < a;
+    return a + b;
+  }
+};
+
+} // namespace safe_math_internal
+
+// Add 'a' and 'b', and set *overflowed to true if overflow occured.
+template<typename Type>
+inline Type AddWithOverflowCheck(Type a, Type b, bool *overflowed) {
+  // Pick the right specialization based on whether Type is signed.
+  typedef safe_math_internal::WithOverflowCheck<Type, MathLimits<Type>::kIsSigned> my_struct;
+  return my_struct::Add(a, b, overflowed);
+}
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/scoped_cleanup-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/scoped_cleanup-test.cc b/be/src/kudu/util/scoped_cleanup-test.cc
new file mode 100644
index 0000000..fad50a0
--- /dev/null
+++ b/be/src/kudu/util/scoped_cleanup-test.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/scoped_cleanup.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+
+TEST(ScopedCleanup, TestCleanup) {
+  int var = 0;
+  {
+    auto saved = var;
+    auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+    var = 42;
+  }
+  ASSERT_EQ(0, var);
+}
+
+TEST(ScopedCleanup, TestCancelCleanup) {
+  int var = 0;
+  {
+    auto saved = var;
+    auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+    var = 42;
+    cleanup.cancel();
+  }
+  ASSERT_EQ(42, var);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/scoped_cleanup.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/scoped_cleanup.h b/be/src/kudu/util/scoped_cleanup.h
new file mode 100644
index 0000000..e989331
--- /dev/null
+++ b/be/src/kudu/util/scoped_cleanup.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+namespace kudu {
+
+// A scoped object which runs a cleanup function when going out of scope. Can
+// be used for scoped resource cleanup.
+template<typename F>
+class ScopedCleanup {
+ public:
+  explicit ScopedCleanup(F f)
+      : cancelled_(false),
+        f_(std::move(f)) {
+  }
+  ~ScopedCleanup() {
+    if (!cancelled_) {
+      f_();
+    }
+  }
+  void cancel() { cancelled_ = true; }
+
+ private:
+  bool cancelled_;
+  F f_;
+};
+
+// Creates a new scoped cleanup instance with the provided function.
+template<typename F>
+ScopedCleanup<F> MakeScopedCleanup(F f) {
+  return ScopedCleanup<F>(f);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/semaphore.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore.cc b/be/src/kudu/util/semaphore.cc
new file mode 100644
index 0000000..985da44
--- /dev/null
+++ b/be/src/kudu/util/semaphore.cc
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/semaphore.h"
+
+#include <semaphore.h>
+#include <glog/logging.h>
+#include "kudu/gutil/walltime.h"
+namespace kudu {
+
+Semaphore::Semaphore(int capacity) {
+  DCHECK_GE(capacity, 0);
+  if (sem_init(&sem_, 0, capacity) != 0) {
+    Fatal("init");
+  }
+}
+
+Semaphore::~Semaphore() {
+  if (sem_destroy(&sem_) != 0) {
+    Fatal("destroy");
+  }
+}
+
+void Semaphore::Acquire() {
+  while (true) {
+    int ret = sem_wait(&sem_);
+    if (ret == 0) {
+      // TODO: would be nice to track acquisition time, etc.
+      return;
+    }
+
+    if (errno == EINTR) continue;
+    Fatal("wait");
+  }
+}
+
+bool Semaphore::TryAcquire() {
+  int ret = sem_trywait(&sem_);
+  if (ret == 0) {
+    return true;
+  }
+  if (errno == EAGAIN || errno == EINTR) {
+    return false;
+  }
+  Fatal("trywait");
+}
+
+bool Semaphore::TimedAcquire(const MonoDelta& timeout) {
+  int64_t microtime = GetCurrentTimeMicros();
+  microtime += timeout.ToMicroseconds();
+
+  struct timespec abs_timeout;
+  MonoDelta::NanosToTimeSpec(microtime * MonoTime::kNanosecondsPerMicrosecond,
+                             &abs_timeout);
+
+  while (true) {
+    int ret = sem_timedwait(&sem_, &abs_timeout);
+    if (ret == 0) return true;
+    if (errno == ETIMEDOUT) return false;
+    if (errno == EINTR) continue;
+    Fatal("timedwait");
+  }
+}
+
+void Semaphore::Release() {
+  PCHECK(sem_post(&sem_) == 0);
+}
+
+int Semaphore::GetValue() {
+  int val;
+  PCHECK(sem_getvalue(&sem_, &val) == 0);
+  return val;
+}
+
+void Semaphore::Fatal(const char* action) {
+  PLOG(FATAL) << "Could not " << action << " semaphore "
+              << reinterpret_cast<void*>(&sem_);
+  abort(); // unnecessary, but avoids gcc complaining
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/semaphore.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore.h b/be/src/kudu/util/semaphore.h
new file mode 100644
index 0000000..88a1086
--- /dev/null
+++ b/be/src/kudu/util/semaphore.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SEMAPHORE_H
+#define KUDU_UTIL_SEMAPHORE_H
+
+#include <semaphore.h>
+#if defined(__APPLE__)
+#include <dispatch/dispatch.h>
+#include "kudu/util/atomic.h"
+#endif  // define(__APPLE__)
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+// Wrapper for POSIX semaphores.
+class Semaphore {
+ public:
+  // Initialize the semaphore with the specified capacity.
+  explicit Semaphore(int capacity);
+  ~Semaphore();
+
+  // Acquire the semaphore.
+  void Acquire();
+
+  // Acquire the semaphore within the given timeout. Returns true if successful.
+  bool TimedAcquire(const MonoDelta& timeout);
+
+  // Try to acquire the semaphore immediately. Returns false if unsuccessful.
+  bool TryAcquire();
+
+  // Release the semaphore.
+  void Release();
+
+  // Get the current value of the semaphore.
+  int GetValue();
+
+  // Boost-compatible wrappers.
+  void lock() { Acquire(); }
+  void unlock() { Release(); }
+  bool try_lock() { return TryAcquire(); }
+
+ private:
+#if !defined(__APPLE__)
+  // Log a fatal error message. Separated out to keep the main functions
+  // as small as possible in terms of code size.
+  void Fatal(const char* action) ATTRIBUTE_NORETURN;
+#endif  // !define(__APPLE__)
+
+#if defined(__APPLE__)
+  dispatch_semaphore_t sem_;
+  AtomicInt<int32_t> count_;
+#else
+  sem_t sem_;
+#endif  // define(__APPLE__)
+  DISALLOW_COPY_AND_ASSIGN(Semaphore);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SEMAPHORE_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/semaphore_macosx.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/semaphore_macosx.cc b/be/src/kudu/util/semaphore_macosx.cc
new file mode 100644
index 0000000..7cc5f08
--- /dev/null
+++ b/be/src/kudu/util/semaphore_macosx.cc
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/semaphore.h"
+
+#include <semaphore.h>
+#include <glog/logging.h>
+#include "kudu/gutil/walltime.h"
+
+namespace kudu {
+
+Semaphore::Semaphore(int capacity)
+  : count_(capacity) {
+  DCHECK_GE(capacity, 0);
+  sem_ = dispatch_semaphore_create(capacity);
+  CHECK_NOTNULL(sem_);
+}
+
+Semaphore::~Semaphore() {
+  dispatch_release(sem_);
+}
+
+void Semaphore::Acquire() {
+  // If the timeout is DISPATCH_TIME_FOREVER, then dispatch_semaphore_wait()
+  // waits forever and always returns zero.
+  CHECK(dispatch_semaphore_wait(sem_, DISPATCH_TIME_FOREVER) == 0);
+  count_.IncrementBy(-1);
+}
+
+bool Semaphore::TryAcquire() {
+  // The dispatch_semaphore_wait() function returns zero upon success and
+  // non-zero after the timeout expires.
+  if (dispatch_semaphore_wait(sem_, DISPATCH_TIME_NOW) == 0) {
+    count_.IncrementBy(-1);
+    return true;
+  }
+  return false;
+}
+
+bool Semaphore::TimedAcquire(const MonoDelta& timeout) {
+  dispatch_time_t t = dispatch_time(DISPATCH_TIME_NOW, timeout.ToNanoseconds());
+  if (dispatch_semaphore_wait(sem_, t) == 0) {
+    count_.IncrementBy(-1);
+    return true;
+  }
+  return false;
+}
+
+void Semaphore::Release() {
+  dispatch_semaphore_signal(sem_);
+  count_.IncrementBy(1);
+}
+
+int Semaphore::GetValue() {
+  return count_.Load();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/signal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/signal.cc b/be/src/kudu/util/signal.cc
new file mode 100644
index 0000000..2de3000
--- /dev/null
+++ b/be/src/kudu/util/signal.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/signal.h"
+
+#include "kudu/util/logging.h"
+
+namespace kudu {
+
+void SetSignalHandler(int signal, SignalHandlerCallback handler) {
+  struct sigaction act;
+  act.sa_handler = handler;
+  sigemptyset(&act.sa_mask);
+  act.sa_flags = 0;
+  PCHECK(sigaction(signal, &act, nullptr) == 0);
+}
+
+void IgnoreSigPipe() {
+  SetSignalHandler(SIGPIPE, SIG_IGN);
+}
+
+void ResetSigPipeHandlerToDefault() {
+  SetSignalHandler(SIGPIPE, SIG_DFL);
+}
+
+// We unblock all signal masks since they are inherited.
+void ResetAllSignalMasksToUnblocked() {
+  sigset_t signals;
+  PCHECK(sigfillset(&signals) == 0);
+  PCHECK(sigprocmask(SIG_UNBLOCK, &signals, nullptr) == 0);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/signal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/signal.h b/be/src/kudu/util/signal.h
new file mode 100644
index 0000000..0c88a80
--- /dev/null
+++ b/be/src/kudu/util/signal.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <signal.h>
+
+namespace kudu {
+
+#if defined(__linux__)
+typedef sighandler_t SignalHandlerCallback;
+#else
+typedef sig_t SignalHandlerCallback;
+#endif
+
+// Set a process-wide signal handler.
+void SetSignalHandler(int signal, SignalHandlerCallback handler);
+
+// Set the disposition of SIGPIPE to SIG_IGN.
+void IgnoreSigPipe();
+
+// Set the disposition of SIGPIPE to SIG_DFL.
+void ResetSigPipeHandlerToDefault();
+
+// Unblock all signal masks.
+void ResetAllSignalMasksToUnblocked();
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/slice-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice-test.cc b/be/src/kudu/util/slice-test.cc
new file mode 100644
index 0000000..b0d3bef
--- /dev/null
+++ b/be/src/kudu/util/slice-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slice.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+
+using std::string;
+
+namespace kudu {
+
+typedef SliceMap<int>::type MySliceMap;
+
+TEST(SliceTest, TestSliceMap) {
+  MySliceMap my_map;
+  Slice a("a");
+  Slice b("b");
+  Slice c("c");
+
+  // Insertion is deliberately out-of-order; the map should restore order.
+  InsertOrDie(&my_map, c, 3);
+  InsertOrDie(&my_map, a, 1);
+  InsertOrDie(&my_map, b, 2);
+
+  int expectedValue = 0;
+  for (const MySliceMap::value_type& pair : my_map) {
+    int data = 'a' + expectedValue++;
+    ASSERT_EQ(Slice(reinterpret_cast<uint8_t*>(&data), 1), pair.first);
+    ASSERT_EQ(expectedValue, pair.second);
+  }
+
+  expectedValue = 0;
+  for (auto iter = my_map.begin(); iter != my_map.end(); iter++) {
+    int data = 'a' + expectedValue++;
+    ASSERT_EQ(Slice(reinterpret_cast<uint8_t*>(&data), 1), iter->first);
+    ASSERT_EQ(expectedValue, iter->second);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/slice.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice.cc b/be/src/kudu/util/slice.cc
new file mode 100644
index 0000000..7e6ab50
--- /dev/null
+++ b/be/src/kudu/util/slice.cc
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/slice.h"
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/status.h"
+#include "kudu/util/logging.h"
+
+namespace kudu {
+
+Status Slice::check_size(size_t expected_size) const {
+  if (PREDICT_FALSE(size() != expected_size)) {
+    return Status::Corruption(StringPrintf("Unexpected Slice size. "
+        "Expected %zu but got %zu.", expected_size, size()), KUDU_REDACT(ToDebugString(100)));
+  }
+  return Status::OK();
+}
+
+// Return a string that contains the copy of the referenced data.
+std::string Slice::ToString() const {
+  return std::string(reinterpret_cast<const char *>(data_), size_);
+}
+
+std::string Slice::ToDebugString(size_t max_len) const {
+  size_t bytes_to_print = size_;
+  bool abbreviated = false;
+  if (max_len != 0 && bytes_to_print > max_len) {
+    bytes_to_print = max_len;
+    abbreviated = true;
+  }
+
+  int size = 0;
+  for (int i = 0; i < bytes_to_print; i++) {
+    if (!isgraph(data_[i])) {
+      size += 4;
+    } else {
+      size++;
+    }
+  }
+  if (abbreviated) {
+    size += 20;  // extra padding
+  }
+
+  std::string ret;
+  ret.reserve(size);
+  for (int i = 0; i < bytes_to_print; i++) {
+    if (!isgraph(data_[i])) {
+      StringAppendF(&ret, "\\x%02x", data_[i] & 0xff);
+    } else {
+      ret.push_back(data_[i]);
+    }
+  }
+  if (abbreviated) {
+    StringAppendF(&ret, "...<%zd bytes total>", size_);
+  }
+  return ret;
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/slice.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/slice.h b/be/src/kudu/util/slice.h
new file mode 100644
index 0000000..8738a5d
--- /dev/null
+++ b/be/src/kudu/util/slice.h
@@ -0,0 +1,317 @@
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+
+#ifndef KUDU_UTIL_SLICE_H_
+#define KUDU_UTIL_SLICE_H_
+
+#include <assert.h>
+#include <map>
+#include <stddef.h>
+#include <stdint.h>
+#include <string.h>
+#include <string>
+
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/faststring.h"
+#endif
+#include "kudu/util/kudu_export.h"
+
+namespace kudu {
+
+class Status;
+
+/// @brief A wrapper around externally allocated data.
+///
+/// Slice is a simple structure containing a pointer into some external
+/// storage and a size. The user of a Slice must ensure that the slice
+/// is not used after the corresponding external storage has been
+/// deallocated.
+///
+/// Multiple threads can invoke const methods on a Slice without
+/// external synchronization, but if any of the threads may call a
+/// non-const method, all threads accessing the same Slice must use
+/// external synchronization.
+///
+/// Slices can be built around faststrings and StringPieces using constructors
+/// with implicit casts. Both StringPieces and faststrings depend on a great
+/// deal of gutil code.
+class KUDU_EXPORT Slice {
+ public:
+  /// Create an empty slice.
+  Slice() : data_(reinterpret_cast<const uint8_t *>("")),
+            size_(0) { }
+
+  /// Create a slice that refers to a @c uint8_t byte array.
+  ///
+  /// @param [in] d
+  ///   The input array.
+  /// @param [in] n
+  ///   Number of bytes in the array.
+  Slice(const uint8_t* d, size_t n) : data_(d), size_(n) { }
+
+  /// Create a slice that refers to a @c char byte array.
+  ///
+  /// @param [in] d
+  ///   The input array.
+  /// @param [in] n
+  ///   Number of bytes in the array.
+  Slice(const char* d, size_t n) :
+    data_(reinterpret_cast<const uint8_t *>(d)),
+    size_(n) { }
+
+  /// Create a slice that refers to the contents of the given string.
+  ///
+  /// @param [in] s
+  ///   The input string.
+  Slice(const std::string& s) : // NOLINT(runtime/explicit)
+    data_(reinterpret_cast<const uint8_t *>(s.data())),
+    size_(s.size()) { }
+
+  /// Create a slice that refers to a C-string s[0,strlen(s)-1].
+  ///
+  /// @param [in] s
+  ///   The input C-string.
+  Slice(const char* s) : // NOLINT(runtime/explicit)
+    data_(reinterpret_cast<const uint8_t *>(s)),
+    size_(strlen(s)) { }
+
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+  /// Create a slice that refers to the contents of a faststring.
+  ///
+  /// @note Further appends to the faststring may invalidate this slice.
+  ///
+  /// @param [in] s
+  ///   The input faststring.
+  Slice(const faststring &s) // NOLINT(runtime/explicit)
+    : data_(s.data()),
+      size_(s.size()) {
+  }
+
+  /// Create a slice that refers to the contents of a string piece.
+  ///
+  /// @param [in] s
+  ///   The input StringPiece.
+  Slice(const StringPiece& s) // NOLINT(runtime/explicit)
+    : data_(reinterpret_cast<const uint8_t*>(s.data())),
+      size_(s.size()) {
+  }
+#endif
+
+  /// @return A pointer to the beginning of the referenced data.
+  const uint8_t* data() const { return data_; }
+
+  /// @return A mutable pointer to the beginning of the referenced data.
+  uint8_t *mutable_data() { return const_cast<uint8_t *>(data_); }
+
+  /// @return The length (in bytes) of the referenced data.
+  size_t size() const { return size_; }
+
+  /// @return @c true iff the length of the referenced data is zero.
+  bool empty() const { return size_ == 0; }
+
+  /// @pre n < size()
+  ///
+  /// @param [in] n
+  ///   The index of the byte.
+  /// @return the n-th byte in the referenced data.
+  const uint8_t &operator[](size_t n) const {
+    assert(n < size());
+    return data_[n];
+  }
+
+  /// Change this slice to refer to an empty array.
+  void clear() {
+    data_ = reinterpret_cast<const uint8_t *>("");
+    size_ = 0;
+  }
+
+  /// Drop the first "n" bytes from this slice.
+  ///
+  /// @pre n <= size()
+  ///
+  /// @note Only the base and bounds of the slice are changed;
+  ///   the data is not modified.
+  ///
+  /// @param [in] n
+  ///   Number of bytes that should be dropped from the beginning.
+  void remove_prefix(size_t n) {
+    assert(n <= size());
+    data_ += n;
+    size_ -= n;
+  }
+
+  /// Truncate the slice to the given number of bytes.
+  ///
+  /// @pre n <= size()
+  ///
+  /// @note Only the base and bounds of the slice are changed;
+  ///   the data is not modified.
+  ///
+  /// @param [in] n
+  ///   The new size of the slice.
+  void truncate(size_t n) {
+    assert(n <= size());
+    size_ = n;
+  }
+
+  /// Check that the slice has the expected size.
+  ///
+  /// @param [in] expected_size
+  /// @return Status::Corruption() iff size() != @c expected_size
+  Status check_size(size_t expected_size) const;
+
+  /// @return A string that contains a copy of the referenced data.
+  std::string ToString() const;
+
+  /// Get printable representation of the data in the slice.
+  ///
+  /// @param [in] max_len
+  ///   The maximum number of bytes to output in the printable format;
+  ///   @c 0 means no limit.
+  /// @return A string with printable representation of the data.
+  std::string ToDebugString(size_t max_len = 0) const;
+
+  /// Do a three-way comparison of the slice's data.
+  ///
+  /// @param [in] b
+  ///   The other slice to compare with.
+  /// @return Values are
+  ///   @li <  0 iff "*this" <  "b"
+  ///   @li == 0 iff "*this" == "b"
+  ///   @li >  0 iff "*this" >  "b"
+  int compare(const Slice& b) const;
+
+  /// Check whether the slice starts with the given prefix.
+  /// @param [in] x
+  ///   The slice in question.
+  /// @return @c true iff "x" is a prefix of "*this"
+  bool starts_with(const Slice& x) const {
+    return ((size_ >= x.size_) &&
+            (MemEqual(data_, x.data_, x.size_)));
+  }
+
+  /// @brief Comparator struct, useful for ordered collections (like STL maps).
+  struct Comparator {
+    /// Compare two slices using Slice::compare()
+    ///
+    /// @param [in] a
+    ///   The slice to call Slice::compare() at.
+    /// @param [in] b
+    ///   The slice to use as a parameter for Slice::compare().
+    /// @return @c true iff @c a is less than @c b by Slice::compare().
+    bool operator()(const Slice& a, const Slice& b) const {
+      return a.compare(b) < 0;
+    }
+  };
+
+  /// Relocate/copy the slice's data into a new location.
+  ///
+  /// @param [in] d
+  ///   The new location for the data. If it's the same location, then no
+  ///   relocation is done. It is assumed that the new location is
+  ///   large enough to fit the data.
+  void relocate(uint8_t* d) {
+    if (data_ != d) {
+      memcpy(d, data_, size_);
+      data_ = d;
+    }
+  }
+
+ private:
+  friend bool operator==(const Slice& x, const Slice& y);
+
+  static bool MemEqual(const void* a, const void* b, size_t n) {
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+    return strings::memeq(a, b, n);
+#else
+    return memcmp(a, b, n) == 0;
+#endif
+  }
+
+  static int MemCompare(const void* a, const void* b, size_t n) {
+#ifdef KUDU_HEADERS_USE_RICH_SLICE
+    return strings::fastmemcmp_inlined(a, b, n);
+#else
+    return memcmp(a, b, n);
+#endif
+  }
+
+  const uint8_t* data_;
+  size_t size_;
+
+  // Intentionally copyable
+};
+
+/// Check whether two slices are identical.
+///
+/// @param [in] x
+///   One slice.
+/// @param [in] y
+///   Another slice.
+/// @return @c true iff two slices contain byte-for-byte identical data.
+inline bool operator==(const Slice& x, const Slice& y) {
+  return ((x.size() == y.size()) &&
+          (Slice::MemEqual(x.data(), y.data(), x.size())));
+}
+
+/// Check whether two slices are not identical.
+///
+/// @param [in] x
+///   One slice.
+/// @param [in] y
+///   Another slice.
+/// @return @c true iff slices contain different data.
+inline bool operator!=(const Slice& x, const Slice& y) {
+  return !(x == y);
+}
+
+/// Output printable representation of the slice into the given output stream.
+///
+/// @param [out] o
+///   The output stream to print the info.
+/// @param [in] s
+///   The slice to print.
+/// @return Reference to the updated output stream.
+inline std::ostream& operator<<(std::ostream& o, const Slice& s) {
+  return o << s.ToDebugString(16); // should be enough for anyone...
+}
+
+inline int Slice::compare(const Slice& b) const {
+  const int min_len = (size_ < b.size_) ? size_ : b.size_;
+  int r = MemCompare(data_, b.data_, min_len);
+  if (r == 0) {
+    if (size_ < b.size_) r = -1;
+    else if (size_ > b.size_) r = +1;
+  }
+  return r;
+}
+
+/// @brief STL map whose keys are Slices.
+///
+/// An example of usage:
+/// @code
+///   typedef SliceMap<int>::type MySliceMap;
+///
+///   MySliceMap my_map;
+///   my_map.insert(MySliceMap::value_type(a, 1));
+///   my_map.insert(MySliceMap::value_type(b, 2));
+///   my_map.insert(MySliceMap::value_type(c, 3));
+///
+///   for (const MySliceMap::value_type& pair : my_map) {
+///     ...
+///   }
+/// @endcode
+template <typename T>
+struct SliceMap {
+  /// A handy typedef for the slice map with appropriate comparison operator.
+  typedef std::map<Slice, T, Slice::Comparator> type;
+};
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_SLICE_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/spinlock_profiling-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling-test.cc b/be/src/kudu/util/spinlock_profiling-test.cc
new file mode 100644
index 0000000..4960c0f
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling-test.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <glog/logging.h>
+#include <strstream>
+
+#include "kudu/gutil/spinlock.h"
+#include "kudu/util/spinlock_profiling.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
+
+// Can't include gutil/synchronization_profiling.h directly as it'll
+// declare a weak symbol directly in this unit test, which the runtime
+// linker will prefer over equivalent strong symbols for some reason. By
+// declaring the symbol without providing an empty definition, the strong
+// symbols are chosen when provided via shared libraries.
+//
+// Further reading:
+// - http://stackoverflow.com/questions/20658809/dynamic-loading-and-weak-symbol-resolution
+// - http://notmysock.org/blog/php/weak-symbols-arent.html
+namespace gutil {
+extern void SubmitSpinLockProfileData(const void *, int64);
+} // namespace gutil
+
+namespace base {
+extern void SubmitSpinLockProfileData(const void *, int64);
+} // namespace base
+
+namespace kudu {
+
+class SpinLockProfilingTest : public KuduTest {};
+
+TEST_F(SpinLockProfilingTest, TestSpinlockProfiling) {
+  scoped_refptr<Trace> t(new Trace);
+  base::SpinLock lock;
+  {
+    ADOPT_TRACE(t.get());
+    gutil::SubmitSpinLockProfileData(&lock, 4000000);
+  }
+  string result = t->DumpToString();
+  LOG(INFO) << "trace: " << result;
+  ASSERT_STR_CONTAINS(result, "\"spinlock_wait_cycles\":4000000");
+  // We can't assert more specifically because the CyclesPerSecond
+  // on different machines might be different.
+  ASSERT_STR_CONTAINS(result, "Waited ");
+  ASSERT_STR_CONTAINS(result, "on lock ");
+
+  ASSERT_GT(GetSpinLockContentionMicros(), 0);
+}
+
+TEST_F(SpinLockProfilingTest, TestStackCollection) {
+  StartSynchronizationProfiling();
+  base::SpinLock lock;
+  gutil::SubmitSpinLockProfileData(&lock, 12345);
+  StopSynchronizationProfiling();
+  std::ostringstream str;
+  int64_t dropped = 0;
+  FlushSynchronizationProfile(&str, &dropped);
+  string s = str.str();
+  ASSERT_STR_CONTAINS(s, "12345\t1 @ ");
+  ASSERT_EQ(0, dropped);
+}
+
+TEST_F(SpinLockProfilingTest, TestTcmallocContention) {
+  StartSynchronizationProfiling();
+  base::SubmitSpinLockProfileData(nullptr, 12345);
+  ASSERT_GE(GetTcmallocContentionMicros(), 0);
+}
+
+} // namespace kudu



Mime
View raw message