Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EC4F0200CAA for ; Sat, 17 Jun 2017 09:25:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EAE1D160BEA; Sat, 17 Jun 2017 07:25:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 974F7160BCD for ; Sat, 17 Jun 2017 09:25:40 +0200 (CEST) Received: (qmail 22894 invoked by uid 500); 17 Jun 2017 07:25:39 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 22885 invoked by uid 99); 17 Jun 2017 07:25:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Jun 2017 07:25:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 36DCAC06F4 for ; Sat, 17 Jun 2017 07:25:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.731 X-Spam-Level: X-Spam-Status: No, score=-3.731 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 3SbtyhmtbdsO for ; Sat, 17 Jun 2017 07:25:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 4E60F5FB43 for ; Sat, 17 Jun 2017 07:25:31 +0000 (UTC) Received: (qmail 22712 invoked by uid 99); 17 Jun 2017 07:25:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 Jun 2017 07:25:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE9F0F21B0; Sat, 17 Jun 2017 07:25:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarasbob@apache.org To: commits@impala.incubator.apache.org Date: Sat, 17 Jun 2017 07:25:32 -0000 Message-Id: <1df835facfb14ea39b8b223aec194533@git.apache.org> In-Reply-To: <4ab400f5a60e4c39af9432665a6c41f4@git.apache.org> References: <4ab400f5a60e4c39af9432665a6c41f4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8 archived-at: Sat, 17 Jun 2017 07:25:43 -0000 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rolling_log.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rolling_log.cc b/be/src/kudu/util/rolling_log.cc new file mode 100644 index 0000000..486abc0 --- /dev/null +++ b/be/src/kudu/util/rolling_log.cc @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/rolling_log.h" + +#include +#include + +#include +#include +#include +#include + +#include + +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/env.h" +#include "kudu/util/net/net_util.h" +#include "kudu/util/path_util.h" +#include "kudu/util/thread_restrictions.h" +#include "kudu/util/user.h" + +using std::ostringstream; +using std::setw; +using std::string; +using std::unique_ptr; +using strings::Substitute; + +static const int kDefaultSizeLimitBytes = 64 * 1024 * 1024; // 64MB + +namespace kudu { + +RollingLog::RollingLog(Env* env, string log_dir, string log_name) + : env_(env), + log_dir_(std::move(log_dir)), + log_name_(std::move(log_name)), + size_limit_bytes_(kDefaultSizeLimitBytes), + compress_after_close_(true) {} + +RollingLog::~RollingLog() { + WARN_NOT_OK(Close(), "Unable to close RollingLog"); +} + +void RollingLog::SetSizeLimitBytes(int64_t size) { + CHECK_GT(size, 0); + size_limit_bytes_ = size; +} + +void RollingLog::SetCompressionEnabled(bool compress) { + compress_after_close_ = compress; +} + +string RollingLog::GetLogFileName(int sequence) const { + ostringstream str; + + // 1. Program name. + str << google::ProgramInvocationShortName(); + + // 2. Host name. + string hostname; + Status s = GetHostname(&hostname); + if (!s.ok()) { + hostname = "unknown_host"; + } + str << "." << hostname; + + // 3. User name. + string user_name; + s = GetLoggedInUser(&user_name); + if (!s.ok()) { + user_name = "unknown_user"; + } + str << "." << user_name; + + // 4. Log name. + str << "." << log_name_; + + // 5. Timestamp. + // Implementation cribbed from glog/logging.cc + time_t time = static_cast(WallTime_Now()); + struct ::tm tm_time; + localtime_r(&time, &tm_time); + + str << "."; + str.fill('0'); + str << 1900+tm_time.tm_year + << setw(2) << 1+tm_time.tm_mon + << setw(2) << tm_time.tm_mday + << '-' + << setw(2) << tm_time.tm_hour + << setw(2) << tm_time.tm_min + << setw(2) << tm_time.tm_sec; + str.clear(); // resets formatting flags + + // 6. Sequence number. + str << "." << sequence; + + // 7. Pid. + str << "." << getpid(); + + return str.str(); +} + +Status RollingLog::Open() { + CHECK(!file_); + + for (int sequence = 0; ; sequence++) { + + string path = JoinPathSegments(log_dir_, + GetLogFileName(sequence)); + + WritableFileOptions opts; + // Logs aren't worth the performance cost of durability. + opts.sync_on_close = false; + opts.mode = Env::CREATE_NON_EXISTING; + + Status s = env_->NewWritableFile(opts, path, &file_); + if (s.IsAlreadyPresent()) { + // We already rolled once at this same timestamp. + // Try again with a new sequence number. + continue; + } + RETURN_NOT_OK(s); + + VLOG(1) << "Rolled " << log_name_ << " log to new file: " << path; + break; + } + return Status::OK(); +} + +Status RollingLog::Close() { + if (!file_) { + return Status::OK(); + } + string path = file_->filename(); + RETURN_NOT_OK_PREPEND(file_->Close(), + Substitute("Unable to close $0", path)); + file_.reset(); + if (compress_after_close_) { + WARN_NOT_OK(CompressFile(path), "Unable to compress old log file"); + } + return Status::OK(); +} + +Status RollingLog::Append(StringPiece s) { + if (!file_) { + RETURN_NOT_OK_PREPEND(Open(), "Unable to open log"); + } + + if (file_->Size() + s.size() > size_limit_bytes_) { + RETURN_NOT_OK_PREPEND(Close(), "Unable to roll log"); + RETURN_NOT_OK_PREPEND(Open(), "Unable to roll log"); + } + RETURN_NOT_OK(file_->Append(s)); + return Status::OK(); +} + +namespace { + +Status GzClose(gzFile f) { + int err = gzclose(f); + switch (err) { + case Z_OK: + return Status::OK(); + case Z_STREAM_ERROR: + return Status::InvalidArgument("Stream not valid"); + case Z_ERRNO: + return Status::IOError("IO Error closing stream"); + case Z_MEM_ERROR: + return Status::RuntimeError("Out of memory"); + case Z_BUF_ERROR: + return Status::IOError("read ended in the middle of a stream"); + default: + return Status::IOError("Unknown zlib error", SimpleItoa(err)); + } +} + +class ScopedGzipCloser { + public: + explicit ScopedGzipCloser(gzFile f) + : file_(f) { + } + + ~ScopedGzipCloser() { + if (file_) { + WARN_NOT_OK(GzClose(file_), "Unable to close gzip stream"); + } + } + + void Cancel() { + file_ = nullptr; + } + + private: + gzFile file_; +}; +} // anonymous namespace + +// We implement CompressFile() manually using zlib APIs rather than forking +// out to '/bin/gzip' since fork() can be expensive on processes that use a large +// amount of memory. During the time of the fork, other threads could end up +// blocked. Implementing it using the zlib stream APIs isn't too much code +// and is less likely to be problematic. +Status RollingLog::CompressFile(const std::string& path) const { + unique_ptr in_file; + RETURN_NOT_OK_PREPEND(env_->NewSequentialFile(path, &in_file), + "Unable to open input file to compress"); + + string gz_path = path + ".gz"; + gzFile gzf = gzopen(gz_path.c_str(), "w"); + if (!gzf) { + return Status::IOError("Unable to open gzip stream"); + } + + ScopedGzipCloser closer(gzf); + + // Loop reading data from the input file and writing to the gzip stream. + uint8_t buf[32 * 1024]; + while (true) { + Slice result(buf, arraysize(buf)); + RETURN_NOT_OK_PREPEND(in_file->Read(&result), + "Unable to read from gzip input"); + if (result.size() == 0) { + break; + } + int n = gzwrite(gzf, result.data(), result.size()); + if (n == 0) { + int errnum; + return Status::IOError("Unable to write to gzip output", + gzerror(gzf, &errnum)); + } + } + closer.Cancel(); + RETURN_NOT_OK_PREPEND(GzClose(gzf), + "Unable to close gzip output"); + + WARN_NOT_OK(env_->DeleteFile(path), + "Unable to delete gzip input file after compression"); + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rolling_log.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rolling_log.h b/be/src/kudu/util/rolling_log.h new file mode 100644 index 0000000..2559d6d --- /dev/null +++ b/be/src/kudu/util/rolling_log.h @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_ROLLING_LOG_H +#define KUDU_UTIL_ROLLING_LOG_H + +#include +#include + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/strings/stringpiece.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Env; +class WritableFile; + +// A simple rolling log. +// +// This creates a log which spans multiple files in a specified directory. +// After a log file reaches a specified size threshold, it automatically rolls +// to the next file in the sequence. +// +// The files are named similarly to glog log files and use the following pattern: +// +// /...... +// log_dir: the log_dir specified in the constructor +// program-name: argv[0], as determined by google::ProgramInvocationShortName() +// hostname: the local machine hostname +// user-name: the current user name +// log-name: the log_name specified in the constructor +// timestamp: the wall clock time when the log file was created, in +// YYYYmmdd-HHMMSS fixed-length format. +// sequence: a sequence number which is used to disambiguate when the log file is +// rolled multiple times within a second +// pid: the pid of the daemon +// +// The log implementation does not ensure durability of the log or its files in any way. +// This class is not thread-safe and must be externally synchronized. +class RollingLog { + public: + RollingLog(Env* env, std::string log_dir, std::string log_name); + + ~RollingLog(); + + // Open the log. + // It is optional to call this function. Append() will automatically open + // the log as necessary if it is not open. + Status Open(); + + // Set the size limit for the current and any future log files. + // + // There is no limit on the total number of previous log segments. We rely + // on system utilities to clean up old logs to maintain some size limit. + void SetSizeLimitBytes(int64_t bytes); + + // If compression is enabled, log files are compressed. + // NOTE: this requires that the passed-in Env instance is the local file system. + void SetCompressionEnabled(bool compress); + + // Append the given data to the current log file. + // + // If appending this data would cross the configured file size limit, a new file + // is created and the data is appended there. + // + // Note that this is a synchronous API and causes potentially-blocking IO on the + // current thread. However, this does not fsync() or otherwise ensure durability + // of the appended data. + Status Append(StringPiece data); + + // Close the log. + Status Close(); + + private: + std::string GetLogFileName(int sequence) const; + + // Compress the given path, writing a new file '.gz'. + Status CompressFile(const std::string& path) const; + + Env* const env_; + const std::string log_dir_; + const std::string log_name_; + + int64_t size_limit_bytes_; + + std::unique_ptr file_; + bool compress_after_close_; + + DISALLOW_COPY_AND_ASSIGN(RollingLog); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_ROLLING_LOG_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_mutex-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rw_mutex-test.cc b/be/src/kudu/util/rw_mutex-test.cc new file mode 100644 index 0000000..7f9e376 --- /dev/null +++ b/be/src/kudu/util/rw_mutex-test.cc @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "kudu/gutil/integral_types.h" +#include "kudu/util/atomic.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/rw_mutex.h" +#include "kudu/util/test_util.h" + +using std::lock_guard; +using std::thread; +using std::try_to_lock; +using std::unique_lock; +using std::vector; + +namespace kudu { + +class RWMutexTest : public KuduTest, + public ::testing::WithParamInterface { + public: + RWMutexTest() + : lock_(GetParam()) { + } + protected: + RWMutex lock_; +}; + +// Instantiate every test for each kind of RWMutex priority. +INSTANTIATE_TEST_CASE_P(Priorities, RWMutexTest, + ::testing::Values(RWMutex::Priority::PREFER_READING, + RWMutex::Priority::PREFER_WRITING)); + +// Multi-threaded test that tries to find deadlocks in the RWMutex wrapper. +TEST_P(RWMutexTest, TestDeadlocks) { + uint64_t number_of_writes = 0; + AtomicInt number_of_reads(0); + + AtomicBool done(false); + vector threads; + + // Start several blocking and non-blocking read-write workloads. + for (int i = 0; i < 2; i++) { + threads.emplace_back([&](){ + while (!done.Load()) { + lock_guard l(lock_); + number_of_writes++; + } + }); + threads.emplace_back([&](){ + while (!done.Load()) { + unique_lock l(lock_, try_to_lock); + if (l.owns_lock()) { + number_of_writes++; + } + } + }); + } + + // Start several blocking and non-blocking read-only workloads. + for (int i = 0; i < 2; i++) { + threads.emplace_back([&](){ + while (!done.Load()) { + shared_lock l(lock_); + number_of_reads.Increment(); + } + }); + threads.emplace_back([&](){ + while (!done.Load()) { + shared_lock l(lock_, try_to_lock); + if (l.owns_lock()) { + number_of_reads.Increment(); + } + } + }); + } + + SleepFor(MonoDelta::FromSeconds(1)); + done.Store(true); + for (auto& t : threads) { + t.join(); + } + + shared_lock l(lock_); + LOG(INFO) << "Number of writes: " << number_of_writes; + LOG(INFO) << "Number of reads: " << number_of_reads.Load(); +} + +#ifndef NDEBUG +// Tests that the RWMutex wrapper catches basic usage errors. This checking is +// only enabled in debug builds. +TEST_P(RWMutexTest, TestLockChecking) { + EXPECT_DEATH({ + lock_.ReadLock(); + lock_.ReadLock(); + }, "already holding lock for reading"); + + EXPECT_DEATH({ + CHECK(lock_.TryReadLock()); + CHECK(lock_.TryReadLock()); + }, "already holding lock for reading"); + + EXPECT_DEATH({ + lock_.ReadLock(); + lock_.WriteLock(); + }, "already holding lock for reading"); + + EXPECT_DEATH({ + CHECK(lock_.TryReadLock()); + CHECK(lock_.TryWriteLock()); + }, "already holding lock for reading"); + + EXPECT_DEATH({ + lock_.WriteLock(); + lock_.ReadLock(); + }, "already holding lock for writing"); + + EXPECT_DEATH({ + CHECK(lock_.TryWriteLock()); + CHECK(lock_.TryReadLock()); + }, "already holding lock for writing"); + + EXPECT_DEATH({ + lock_.WriteLock(); + lock_.WriteLock(); + }, "already holding lock for writing"); + + EXPECT_DEATH({ + CHECK(lock_.TryWriteLock()); + CHECK(lock_.TryWriteLock()); + }, "already holding lock for writing"); + + EXPECT_DEATH({ + lock_.ReadUnlock(); + }, "wasn't holding lock for reading"); + + EXPECT_DEATH({ + lock_.WriteUnlock(); + }, "wasn't holding lock for writing"); + + EXPECT_DEATH({ + lock_.ReadLock(); + lock_.WriteUnlock(); + }, "already holding lock for reading"); + + EXPECT_DEATH({ + CHECK(lock_.TryReadLock()); + lock_.WriteUnlock(); + }, "already holding lock for reading"); + + EXPECT_DEATH({ + lock_.WriteLock(); + lock_.ReadUnlock(); + }, "already holding lock for writing"); + + EXPECT_DEATH({ + CHECK(lock_.TryWriteLock()); + lock_.ReadUnlock(); + }, "already holding lock for writing"); +} +#endif + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_mutex.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rw_mutex.cc b/be/src/kudu/util/rw_mutex.cc new file mode 100644 index 0000000..6dd8f3e --- /dev/null +++ b/be/src/kudu/util/rw_mutex.cc @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/rw_mutex.h" + +#include +#include + +#include "kudu/gutil/map-util.h" +#include "kudu/util/env.h" + +using std::lock_guard; + +namespace { + +void unlock_rwlock(pthread_rwlock_t* rwlock) { + int rv = pthread_rwlock_unlock(rwlock); + DCHECK_EQ(0, rv) << strerror(rv); +} + +} // anonymous namespace + +namespace kudu { + +RWMutex::RWMutex() +#ifndef NDEBUG + : writer_tid_(0) +#endif +{ + Init(Priority::PREFER_READING); +} + +RWMutex::RWMutex(Priority prio) +#ifndef NDEBUG + : writer_tid_(0) +#endif +{ + Init(prio); +} + +void RWMutex::Init(Priority prio) { +#ifdef __linux__ + // Adapt from priority to the pthread type. + int kind = PTHREAD_RWLOCK_PREFER_READER_NP; + switch (prio) { + case Priority::PREFER_READING: + kind = PTHREAD_RWLOCK_PREFER_READER_NP; + break; + case Priority::PREFER_WRITING: + kind = PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP; + break; + } + + // Initialize the new rwlock with the user's preference. + pthread_rwlockattr_t attr; + int rv = pthread_rwlockattr_init(&attr); + DCHECK_EQ(0, rv) << strerror(rv); + rv = pthread_rwlockattr_setkind_np(&attr, kind); + DCHECK_EQ(0, rv) << strerror(rv); + rv = pthread_rwlock_init(&native_handle_, &attr); + DCHECK_EQ(0, rv) << strerror(rv); + rv = pthread_rwlockattr_destroy(&attr); + DCHECK_EQ(0, rv) << strerror(rv); +#else + int rv = pthread_rwlock_init(&native_handle_, NULL); + DCHECK_EQ(0, rv) << strerror(rv); +#endif +} + +RWMutex::~RWMutex() { + int rv = pthread_rwlock_destroy(&native_handle_); + DCHECK_EQ(0, rv) << strerror(rv); +} + +void RWMutex::ReadLock() { + CheckLockState(LockState::NEITHER); + int rv = pthread_rwlock_rdlock(&native_handle_); + DCHECK_EQ(0, rv) << strerror(rv); + MarkForReading(); +} + +void RWMutex::ReadUnlock() { + CheckLockState(LockState::READER); + UnmarkForReading(); + unlock_rwlock(&native_handle_); +} + +bool RWMutex::TryReadLock() { + CheckLockState(LockState::NEITHER); + int rv = pthread_rwlock_tryrdlock(&native_handle_); + if (rv == EBUSY) { + return false; + } + DCHECK_EQ(0, rv) << strerror(rv); + MarkForReading(); + return true; +} + +void RWMutex::WriteLock() { + CheckLockState(LockState::NEITHER); + int rv = pthread_rwlock_wrlock(&native_handle_); + DCHECK_EQ(0, rv) << strerror(rv); + MarkForWriting(); +} + +void RWMutex::WriteUnlock() { + CheckLockState(LockState::WRITER); + UnmarkForWriting(); + unlock_rwlock(&native_handle_); +} + +bool RWMutex::TryWriteLock() { + CheckLockState(LockState::NEITHER); + int rv = pthread_rwlock_trywrlock(&native_handle_); + if (rv == EBUSY) { + return false; + } + DCHECK_EQ(0, rv) << strerror(rv); + MarkForWriting(); + return true; +} + +#ifndef NDEBUG + +void RWMutex::AssertAcquiredForReading() const { + lock_guard l(tid_lock_); + CHECK(ContainsKey(reader_tids_, Env::Default()->gettid())); +} + +void RWMutex::AssertAcquiredForWriting() const { + lock_guard l(tid_lock_); + CHECK_EQ(Env::Default()->gettid(), writer_tid_); +} + +void RWMutex::CheckLockState(LockState state) const { + pid_t my_tid = Env::Default()->gettid(); + bool is_reader; + bool is_writer; + { + lock_guard l(tid_lock_); + is_reader = ContainsKey(reader_tids_, my_tid); + is_writer = writer_tid_ == my_tid; + } + + switch (state) { + case LockState::NEITHER: + CHECK(!is_reader) << "Invalid state, already holding lock for reading"; + CHECK(!is_writer) << "Invalid state, already holding lock for writing"; + break; + case LockState::READER: + CHECK(!is_writer) << "Invalid state, already holding lock for writing"; + CHECK(is_reader) << "Invalid state, wasn't holding lock for reading"; + break; + case LockState::WRITER: + CHECK(!is_reader) << "Invalid state, already holding lock for reading"; + CHECK(is_writer) << "Invalid state, wasn't holding lock for writing"; + break; + } +} + +void RWMutex::MarkForReading() { + lock_guard l(tid_lock_); + reader_tids_.insert(Env::Default()->gettid()); +} + +void RWMutex::MarkForWriting() { + lock_guard l(tid_lock_); + writer_tid_ = Env::Default()->gettid(); +} + +void RWMutex::UnmarkForReading() { + lock_guard l(tid_lock_); + reader_tids_.erase(Env::Default()->gettid()); +} + +void RWMutex::UnmarkForWriting() { + lock_guard l(tid_lock_); + writer_tid_ = 0; +} + +#endif + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_mutex.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rw_mutex.h b/be/src/kudu/util/rw_mutex.h new file mode 100644 index 0000000..969f2be --- /dev/null +++ b/be/src/kudu/util/rw_mutex.h @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "kudu/gutil/macros.h" +#include "kudu/util/locks.h" + +namespace kudu { + +// Read/write mutex. Implemented as a thin wrapper around pthread_rwlock_t. +// +// Although pthread_rwlock_t allows recursive acquisition, this wrapper does +// not, and will crash in debug mode if recursive acquisition is detected. +class RWMutex { + public: + + // Possible fairness policies for the RWMutex. + enum class Priority { + // The lock will prioritize readers at the expense of writers. + PREFER_READING, + + // The lock will prioritize writers at the expense of readers. + // + // Care should be taken when using this fairness policy, as it can lead to + // unexpected deadlocks (e.g. a writer waiting on the lock will prevent + // additional readers from acquiring it). + PREFER_WRITING, + }; + + // Create an RWMutex that prioritizes readers. + RWMutex(); + + // Create an RWMutex with customized priority. This is a best effort; the + // underlying platform may not support custom priorities. + explicit RWMutex(Priority prio); + + ~RWMutex(); + + void ReadLock(); + void ReadUnlock(); + bool TryReadLock(); + + void WriteLock(); + void WriteUnlock(); + bool TryWriteLock(); + +#ifndef NDEBUG + void AssertAcquiredForReading() const; + void AssertAcquiredForWriting() const; +#else + void AssertAcquiredForReading() const {} + void AssertAcquiredForWriting() const {} +#endif + + // Aliases for use with std::lock_guard and kudu::shared_lock. + void lock() { WriteLock(); } + void unlock() { WriteUnlock(); } + bool try_lock() { return TryWriteLock(); } + void lock_shared() { ReadLock(); } + void unlock_shared() { ReadUnlock(); } + bool try_lock_shared() { return TryReadLock(); } + + private: + void Init(Priority prio); + + enum class LockState { + NEITHER, + READER, + WRITER, + }; +#ifndef NDEBUG + void CheckLockState(LockState state) const; + void MarkForReading(); + void MarkForWriting(); + void UnmarkForReading(); + void UnmarkForWriting(); +#else + void CheckLockState(LockState state) const {} + void MarkForReading() {} + void MarkForWriting() {} + void UnmarkForReading() {} + void UnmarkForWriting() {} +#endif + + pthread_rwlock_t native_handle_; + +#ifndef NDEBUG + // Protects reader_tids_ and writer_tid_. + mutable simple_spinlock tid_lock_; + + // Tracks all current readers by tid. + std::unordered_set reader_tids_; + + // Tracks the current writer (if one exists) by tid. + pid_t writer_tid_; +#endif + + DISALLOW_COPY_AND_ASSIGN(RWMutex); +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_semaphore-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rw_semaphore-test.cc b/be/src/kudu/util/rw_semaphore-test.cc new file mode 100644 index 0000000..d12fda1 --- /dev/null +++ b/be/src/kudu/util/rw_semaphore-test.cc @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/rw_semaphore.h" + +using std::thread; +using std::vector; + +namespace kudu { +struct SharedState { + SharedState() : done(false), int_var(0) {} + + bool done; + int64_t int_var; + rw_semaphore sem; +}; + +// Thread which increases the value in the shared state under the write lock. +void Writer(SharedState* state) { + int i = 0; + while (true) { + std::lock_guard l(state->sem); + state->int_var += (i++); + if (state->done) { + break; + } + } +} + +// Thread which verifies that the value in the shared state only increases. +void Reader(SharedState* state) { + int prev_val = 0; + while (true) { + shared_lock l(state->sem); + // The int var should only be seen to increase. + CHECK_GE(state->int_var, prev_val); + prev_val = state->int_var; + if (state->done) { + break; + } + } +} + +// Test which verifies basic functionality of the semaphore. +// When run under TSAN this also verifies the barriers. +TEST(RWSemaphoreTest, TestBasicOperation) { + SharedState s; + vector threads; + // Start 5 readers and writers. + for (int i = 0; i < 5; i++) { + threads.push_back(new thread(Reader, &s)); + threads.push_back(new thread(Writer, &s)); + } + + // Let them contend for a short amount of time. + SleepFor(MonoDelta::FromMilliseconds(50)); + + // Signal them to stop. + { + std::lock_guard l(s.sem); + s.done = true; + } + + for (thread* t : threads) { + t->join(); + delete t; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rw_semaphore.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rw_semaphore.h b/be/src/kudu/util/rw_semaphore.h new file mode 100644 index 0000000..de0bb59 --- /dev/null +++ b/be/src/kudu/util/rw_semaphore.h @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_RW_SEMAPHORE_H +#define KUDU_UTIL_RW_SEMAPHORE_H + +#include +#include + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/util/debug-util.h" + +#include "kudu/util/thread.h" + +namespace kudu { + +// Read-Write semaphore. 32bit uint that contains the number of readers. +// When someone wants to write, tries to set the 32bit, and waits until +// the readers have finished. Readers are spinning while the write flag is set. +// +// This rw-semaphore makes no attempt at fairness, though it does avoid write +// starvation (no new readers may obtain the lock if a write is waiting). +// +// NOTE: this means that it is not safe to reentrantly acquire the read lock, +// due to the following deadlock: +// - T1: acquire read lock +// - T2: wait for write lock +// (blocks waiting for readers) +// - T1: try to acquire read-lock reentrantly +// (blocks to avoid starving writers) +// +// Given that this is currently based only on spinning (and not futex), +// it should only be used in cases where the lock is held for very short +// time intervals. +// +// If the semaphore is expected to always be released from the same thread +// that acquired it, use rw_spinlock instead. +// +// In order to support easier debugging of leaked locks, this class can track +// the stack trace of the last thread to lock it in write mode. To do so, +// uncomment the following define: +// #define RW_SEMAPHORE_TRACK_HOLDER 1 +// ... and then in gdb, print the contents of the semaphore, and you should +// see the collected stack trace. +class rw_semaphore { + public: + rw_semaphore() : state_(0) { + } + ~rw_semaphore() {} + + void lock_shared() { + int loop_count = 0; + Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); + while (true) { + Atomic32 expected = cur_state & kNumReadersMask; // I expect no write lock + Atomic32 try_new_state = expected + 1; // Add me as reader + cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state); + if (cur_state == expected) + break; + // Either was already locked by someone else, or CAS failed. + boost::detail::yield(loop_count++); + } + } + + void unlock_shared() { + int loop_count = 0; + Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); + while (true) { + DCHECK_GT(cur_state & kNumReadersMask, 0) + << "unlock_shared() called when there are no shared locks held"; + Atomic32 expected = cur_state; // I expect a write lock and other readers + Atomic32 try_new_state = expected - 1; // Drop me as reader + cur_state = base::subtle::Release_CompareAndSwap(&state_, expected, try_new_state); + if (cur_state == expected) + break; + // Either was already locked by someone else, or CAS failed. + boost::detail::yield(loop_count++); + } + } + + // Tries to acquire a write lock, if no one else has it. + // This function retries on CAS failure and waits for readers to complete. + bool try_lock() { + int loop_count = 0; + Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); + while (true) { + // someone else has already the write lock + if (cur_state & kWriteFlag) + return false; + + Atomic32 expected = cur_state & kNumReadersMask; // I expect some 0+ readers + Atomic32 try_new_state = kWriteFlag | expected; // I want to lock the other writers + cur_state = base::subtle::Acquire_CompareAndSwap(&state_, expected, try_new_state); + if (cur_state == expected) + break; + // Either was already locked by someone else, or CAS failed. + boost::detail::yield(loop_count++); + } + + WaitPendingReaders(); + RecordLockHolderStack(); + return true; + } + + void lock() { + int loop_count = 0; + Atomic32 cur_state = base::subtle::NoBarrier_Load(&state_); + while (true) { + Atomic32 expected = cur_state & kNumReadersMask; // I expect some 0+ readers + Atomic32 try_new_state = kWriteFlag | expected; // I want to lock the other writers + // Note: we use NoBarrier here because we'll do the Acquire barrier down below + // in WaitPendingReaders + cur_state = base::subtle::NoBarrier_CompareAndSwap(&state_, expected, try_new_state); + if (cur_state == expected) + break; + // Either was already locked by someone else, or CAS failed. + boost::detail::yield(loop_count++); + } + + WaitPendingReaders(); + +#ifndef NDEBUG + writer_tid_ = Thread::CurrentThreadId(); +#endif // NDEBUG + RecordLockHolderStack(); + } + + void unlock() { + // I expect to be the only writer + DCHECK_EQ(base::subtle::NoBarrier_Load(&state_), kWriteFlag); + +#ifndef NDEBUG + writer_tid_ = -1; // Invalid tid. +#endif // NDEBUG + + ResetLockHolderStack(); + // Reset: no writers & no readers. + Release_Store(&state_, 0); + } + + // Return true if the lock is currently held for write by any thread. + // See simple_semaphore::is_locked() for details about where this is useful. + bool is_write_locked() const { + return base::subtle::NoBarrier_Load(&state_) & kWriteFlag; + } + + // Return true if the lock is currently held, either for read or write + // by any thread. + // See simple_semaphore::is_locked() for details about where this is useful. + bool is_locked() const { + return base::subtle::NoBarrier_Load(&state_); + } + + private: + static const uint32_t kNumReadersMask = 0x7fffffff; + static const uint32_t kWriteFlag = 1 << 31; + +#ifdef RW_SEMAPHORE_TRACK_HOLDER + StackTrace writer_stack_; + void RecordLockHolderStack() { + writer_stack_.Collect(); + } + void ResetLockHolderStack() { + writer_stack_.Reset(); + } +#else + void RecordLockHolderStack() { + } + void ResetLockHolderStack() { + } +#endif + + void WaitPendingReaders() { + int loop_count = 0; + while ((base::subtle::Acquire_Load(&state_) & kNumReadersMask) > 0) { + boost::detail::yield(loop_count++); + } + } + + private: + volatile Atomic32 state_; +#ifndef NDEBUG + int64_t writer_tid_; +#endif // NDEBUG +}; + +} // namespace kudu +#endif /* KUDU_UTIL_RW_SEMAPHORE_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rwc_lock-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rwc_lock-test.cc b/be/src/kudu/util/rwc_lock-test.cc new file mode 100644 index 0000000..99c8071 --- /dev/null +++ b/be/src/kudu/util/rwc_lock-test.cc @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "kudu/gutil/atomicops.h" +#include "kudu/util/rwc_lock.h" +#include "kudu/util/test_util.h" +#include "kudu/util/locks.h" + +namespace kudu { + +using base::subtle::NoBarrier_Load; +using base::subtle::Release_Store; +using std::string; +using std::thread; +using std::vector; + +class RWCLockTest : public KuduTest {}; + +// Holds counters of how many threads hold the lock in each of the +// provided modes. +struct LockHoldersCount { + LockHoldersCount() + : num_readers(0), + num_writers(0), + num_committers(0) { + } + + // Check the invariants of the lock counts. + void CheckInvariants() { + // At no time should we have more than one writer or committer. + CHECK_LE(num_writers, 1); + CHECK_LE(num_committers, 1); + + // If we have any readers, then we should not have any committers. + if (num_readers > 0) { + CHECK_EQ(num_committers, 0); + } + } + + void AdjustReaders(int delta) { + std::lock_guard l(lock); + num_readers += delta; + CheckInvariants(); + } + + void AdjustWriters(int delta) { + std::lock_guard l(lock); + num_writers += delta; + CheckInvariants(); + } + + void AdjustCommitters(int delta) { + std::lock_guard l(lock); + num_committers += delta; + CheckInvariants(); + } + + int num_readers; + int num_writers; + int num_committers; + simple_spinlock lock; +}; + +struct SharedState { + LockHoldersCount counts; + RWCLock rwc_lock; + Atomic32 stop; +}; + +void ReaderThread(SharedState* state) { + while (!NoBarrier_Load(&state->stop)) { + state->rwc_lock.ReadLock(); + state->counts.AdjustReaders(1); + state->counts.AdjustReaders(-1); + state->rwc_lock.ReadUnlock(); + } +} + +void WriterThread(SharedState* state) { + string local_str; + while (!NoBarrier_Load(&state->stop)) { + state->rwc_lock.WriteLock(); + state->counts.AdjustWriters(1); + + state->rwc_lock.UpgradeToCommitLock(); + state->counts.AdjustWriters(-1); + state->counts.AdjustCommitters(1); + + state->counts.AdjustCommitters(-1); + state->rwc_lock.CommitUnlock(); + } +} + + +TEST_F(RWCLockTest, TestCorrectBehavior) { + SharedState state; + Release_Store(&state.stop, 0); + + vector threads; + + const int kNumWriters = 5; + const int kNumReaders = 5; + + for (int i = 0; i < kNumWriters; i++) { + threads.emplace_back(WriterThread, &state); + } + for (int i = 0; i < kNumReaders; i++) { + threads.emplace_back(ReaderThread, &state); + } + + if (AllowSlowTests()) { + SleepFor(MonoDelta::FromSeconds(1)); + } else { + SleepFor(MonoDelta::FromMilliseconds(100)); + } + + Release_Store(&state.stop, 1); + + for (thread& t : threads) { + t.join(); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rwc_lock.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rwc_lock.cc b/be/src/kudu/util/rwc_lock.cc new file mode 100644 index 0000000..efe3ccc --- /dev/null +++ b/be/src/kudu/util/rwc_lock.cc @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/rwc_lock.h" + +#include + +#ifndef NDEBUG +#include "kudu/gutil/walltime.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/env.h" +#include "kudu/util/thread.h" +#endif // NDEBUG + +namespace kudu { + +RWCLock::RWCLock() + : no_mutators_(&lock_), + no_readers_(&lock_), + reader_count_(0), +#ifdef NDEBUG + write_locked_(false) { +#else + write_locked_(false), + last_writer_tid_(0), + last_writelock_acquire_time_(0) { + last_writer_backtrace_[0] = '\0'; +#endif // NDEBUG +} + +RWCLock::~RWCLock() { + CHECK_EQ(reader_count_, 0); +} + +void RWCLock::ReadLock() { + MutexLock l(lock_); + reader_count_++; +} + +void RWCLock::ReadUnlock() { + MutexLock l(lock_); + DCHECK_GT(reader_count_, 0); + reader_count_--; + if (reader_count_ == 0) { + no_readers_.Signal(); + } +} + +bool RWCLock::HasReaders() const { + MutexLock l(lock_); + return reader_count_ > 0; +} + +bool RWCLock::HasWriteLock() const { + MutexLock l(lock_); +#ifndef NDEBUG + return last_writer_tid_ == Thread::CurrentThreadId(); +#else + return write_locked_; +#endif +} + +void RWCLock::WriteLock() { + MutexLock l(lock_); + // Wait for any other mutations to finish. + while (write_locked_) { + no_mutators_.Wait(); + } +#ifndef NDEBUG + last_writelock_acquire_time_ = GetCurrentTimeMicros(); + last_writer_tid_ = Thread::CurrentThreadId(); + HexStackTraceToString(last_writer_backtrace_, kBacktraceBufSize); +#endif // NDEBUG + write_locked_ = true; +} + +void RWCLock::WriteUnlock() { + MutexLock l(lock_); + DCHECK(write_locked_); + write_locked_ = false; +#ifndef NDEBUG + last_writer_backtrace_[0] = '\0'; +#endif // NDEBUG + no_mutators_.Signal(); +} + +void RWCLock::UpgradeToCommitLock() { + lock_.lock(); + DCHECK(write_locked_); + while (reader_count_ > 0) { + no_readers_.Wait(); + } + DCHECK(write_locked_); + + // Leaves the lock held, which prevents any new readers + // or writers. +} + +void RWCLock::CommitUnlock() { + DCHECK_EQ(0, reader_count_); + write_locked_ = false; +#ifndef NDEBUG + last_writer_backtrace_[0] = '\0'; +#endif // NDEBUG + no_mutators_.Broadcast(); + lock_.unlock(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rwc_lock.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rwc_lock.h b/be/src/kudu/util/rwc_lock.h new file mode 100644 index 0000000..6d4cb70 --- /dev/null +++ b/be/src/kudu/util/rwc_lock.h @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_RWC_LOCK_H +#define KUDU_UTIL_RWC_LOCK_H + +#include "kudu/gutil/macros.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/mutex.h" + +namespace kudu { + +// A read-write-commit lock. +// +// This lock has three modes: read, write, and commit. +// The lock compatibility matrix is as follows: +// +// Read Write Commit +// Read X X +// Write X +// Commit +// +// An 'X' indicates that the two types of locks may be +// held at the same time. +// +// In prose: +// - Multiple threads may hold the Read lock at the same time. +// - A single thread may hold the Write lock, potentially at the +// same time as any number of readers. +// - A single thread may hold the Commit lock, but this lock is completely +// exclusive (no concurrent readers or writers). +// +// A typical use case for this type of lock is when a structure is read often, +// occasionally updated, and the update operation can take a long time. In this +// use case, the readers simply use ReadLock() and ReadUnlock(), while the +// writer uses a copy-on-write technique like: +// +// obj->lock.WriteLock(); +// // NOTE: cannot safely mutate obj->state directly here, since readers +// // may be concurrent! So, we make a local copy to mutate. +// my_local_copy = obj->state; +// SomeLengthyMutation(my_local_copy); +// obj->lock.UpgradeToCommitLock(); +// obj->state = my_local_copy; +// obj->lock.CommitUnlock(); +// +// This is more efficient than a standard Reader-Writer lock since the lengthy +// mutation is only protected against other concurrent mutators, and readers +// may continue to run with no contention. +// +// For the common pattern described above, the 'CowObject<>' template class defined +// in cow_object.h is more convenient than manual locking. +// +// NOTE: this implementation currently does not implement any starvation protection +// or fairness. If the read lock is being constantly acquired (i.e reader count +// never drops to 0) then UpgradeToCommitLock() may block arbitrarily long. +class RWCLock { + public: + RWCLock(); + ~RWCLock(); + + // Acquire the lock in read mode. Upon return, guarantees that: + // - Other threads may concurrently hold the lock for Read. + // - Either zero or one thread may hold the lock for Write. + // - No threads hold the lock for Commit. + void ReadLock(); + void ReadUnlock(); + + // Return true if there are any readers currently holding the lock. + // Useful for debug assertions. + bool HasReaders() const; + + // Return true if the current thread holds the write lock. + // + // In DEBUG mode this is accurate -- we track the current holder's tid. + // In non-DEBUG mode, this may sometimes return true even if another thread + // is in fact the holder. + // Thus, this is only really useful in the context of a DCHECK assertion. + bool HasWriteLock() const; + + // Boost-like wrappers, so boost lock guards work + void lock_shared() { ReadLock(); } + void unlock_shared() { ReadUnlock(); } + + // Acquire the lock in write mode. Upon return, guarantees that: + // - Other threads may concurrently hold the lock for Read. + // - No other threads hold the lock for Write or Commit. + void WriteLock(); + void WriteUnlock(); + + // Boost-like wrappers + void lock() { WriteLock(); } + void unlock() { WriteUnlock(); } + + // Upgrade the lock from Write mode to Commit mode. + // Requires that the current thread holds the lock in Write mode. + // Upon return, guarantees: + // - No other thread holds the lock in any mode. + void UpgradeToCommitLock(); + void CommitUnlock(); + + private: + // Lock which protects reader_count_ and write_locked_. + // Additionally, while the commit lock is held, the + // locking thread holds this mutex, which prevents any new + // threads from obtaining the lock in any mode. + mutable Mutex lock_; + ConditionVariable no_mutators_, no_readers_; + int reader_count_; + bool write_locked_; + +#ifndef NDEBUG + static const int kBacktraceBufSize = 1024; + int64_t last_writer_tid_; + int64_t last_writelock_acquire_time_; + char last_writer_backtrace_[kBacktraceBufSize]; +#endif // NDEBUG + + DISALLOW_COPY_AND_ASSIGN(RWCLock); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_RWC_LOCK_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/safe_math-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/safe_math-test.cc b/be/src/kudu/util/safe_math-test.cc new file mode 100644 index 0000000..d3a81c6 --- /dev/null +++ b/be/src/kudu/util/safe_math-test.cc @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +// Must come before gtest.h. +#include "kudu/gutil/mathlimits.h" + +#include +#include "kudu/util/safe_math.h" + +namespace kudu { +template +static void DoTest(T a, T b, bool expected) { + SCOPED_TRACE(a); + SCOPED_TRACE(b); + bool overflow = false; + T ret = AddWithOverflowCheck(a, b, &overflow); + EXPECT_EQ(overflow, expected); + if (!overflow) { + EXPECT_EQ(ret, a + b); + } +} + +TEST(TestSafeMath, TestSignedInts) { + // Overflow above max of range. + DoTest(MathLimits::kMax - 10, 15, true); + DoTest(MathLimits::kMax - 10, 10, false); + + // Underflow around negative + DoTest(MathLimits::kMin + 10, -15, true); + DoTest(MathLimits::kMin + 10, -5, false); + +} + +TEST(TestSafeMath, TestUnsignedInts) { + // Overflow above max + DoTest(MathLimits::kMax - 10, 15, true); + DoTest(MathLimits::kMax - 10, 10, false); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/safe_math.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/safe_math.h b/be/src/kudu/util/safe_math.h new file mode 100644 index 0000000..4c126dd --- /dev/null +++ b/be/src/kudu/util/safe_math.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Inline functions for doing overflow-safe operations on integers. +// These should be used when doing bounds checks on user-provided data, +// for example. +// See also: https://www.securecoding.cert.org/confluence/display/cplusplus/INT32-CPP.+Ensure+that+operations+on+signed+integers+do+not+result+in+overflow +#ifndef KUDU_UTIL_SAFE_MATH_H +#define KUDU_UTIL_SAFE_MATH_H + +#include "kudu/gutil/mathlimits.h" + +namespace kudu { + +namespace safe_math_internal { + +// Template which is specialized for signed and unsigned types separately. +template +struct WithOverflowCheck { +}; + + +// Specialization for signed types. +template +struct WithOverflowCheck { + static inline Type Add(Type a, Type b, bool *overflowed) { + // Implementation from the CERT article referenced in the file header. + *overflowed = (((a > 0) && (b > 0) && (a > (MathLimits::kMax - b))) || + ((a < 0) && (b < 0) && (a < (MathLimits::kMin - b)))); + return a + b; + } +}; + +// Specialization for unsigned types. +template +struct WithOverflowCheck { + static inline Type Add(Type a, Type b, bool *overflowed) { + Type ret = a + b; + *overflowed = ret < a; + return a + b; + } +}; + +} // namespace safe_math_internal + +// Add 'a' and 'b', and set *overflowed to true if overflow occured. +template +inline Type AddWithOverflowCheck(Type a, Type b, bool *overflowed) { + // Pick the right specialization based on whether Type is signed. + typedef safe_math_internal::WithOverflowCheck::kIsSigned> my_struct; + return my_struct::Add(a, b, overflowed); +} + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/scoped_cleanup-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/scoped_cleanup-test.cc b/be/src/kudu/util/scoped_cleanup-test.cc new file mode 100644 index 0000000..fad50a0 --- /dev/null +++ b/be/src/kudu/util/scoped_cleanup-test.cc @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/scoped_cleanup.h" + +#include + +namespace kudu { + +TEST(ScopedCleanup, TestCleanup) { + int var = 0; + { + auto saved = var; + auto cleanup = MakeScopedCleanup([&] () { var = saved; }); + var = 42; + } + ASSERT_EQ(0, var); +} + +TEST(ScopedCleanup, TestCancelCleanup) { + int var = 0; + { + auto saved = var; + auto cleanup = MakeScopedCleanup([&] () { var = saved; }); + var = 42; + cleanup.cancel(); + } + ASSERT_EQ(42, var); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/scoped_cleanup.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/scoped_cleanup.h b/be/src/kudu/util/scoped_cleanup.h new file mode 100644 index 0000000..e989331 --- /dev/null +++ b/be/src/kudu/util/scoped_cleanup.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace kudu { + +// A scoped object which runs a cleanup function when going out of scope. Can +// be used for scoped resource cleanup. +template +class ScopedCleanup { + public: + explicit ScopedCleanup(F f) + : cancelled_(false), + f_(std::move(f)) { + } + ~ScopedCleanup() { + if (!cancelled_) { + f_(); + } + } + void cancel() { cancelled_ = true; } + + private: + bool cancelled_; + F f_; +}; + +// Creates a new scoped cleanup instance with the provided function. +template +ScopedCleanup MakeScopedCleanup(F f) { + return ScopedCleanup(f); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/semaphore.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/semaphore.cc b/be/src/kudu/util/semaphore.cc new file mode 100644 index 0000000..985da44 --- /dev/null +++ b/be/src/kudu/util/semaphore.cc @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/semaphore.h" + +#include +#include +#include "kudu/gutil/walltime.h" +namespace kudu { + +Semaphore::Semaphore(int capacity) { + DCHECK_GE(capacity, 0); + if (sem_init(&sem_, 0, capacity) != 0) { + Fatal("init"); + } +} + +Semaphore::~Semaphore() { + if (sem_destroy(&sem_) != 0) { + Fatal("destroy"); + } +} + +void Semaphore::Acquire() { + while (true) { + int ret = sem_wait(&sem_); + if (ret == 0) { + // TODO: would be nice to track acquisition time, etc. + return; + } + + if (errno == EINTR) continue; + Fatal("wait"); + } +} + +bool Semaphore::TryAcquire() { + int ret = sem_trywait(&sem_); + if (ret == 0) { + return true; + } + if (errno == EAGAIN || errno == EINTR) { + return false; + } + Fatal("trywait"); +} + +bool Semaphore::TimedAcquire(const MonoDelta& timeout) { + int64_t microtime = GetCurrentTimeMicros(); + microtime += timeout.ToMicroseconds(); + + struct timespec abs_timeout; + MonoDelta::NanosToTimeSpec(microtime * MonoTime::kNanosecondsPerMicrosecond, + &abs_timeout); + + while (true) { + int ret = sem_timedwait(&sem_, &abs_timeout); + if (ret == 0) return true; + if (errno == ETIMEDOUT) return false; + if (errno == EINTR) continue; + Fatal("timedwait"); + } +} + +void Semaphore::Release() { + PCHECK(sem_post(&sem_) == 0); +} + +int Semaphore::GetValue() { + int val; + PCHECK(sem_getvalue(&sem_, &val) == 0); + return val; +} + +void Semaphore::Fatal(const char* action) { + PLOG(FATAL) << "Could not " << action << " semaphore " + << reinterpret_cast(&sem_); + abort(); // unnecessary, but avoids gcc complaining +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/semaphore.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/semaphore.h b/be/src/kudu/util/semaphore.h new file mode 100644 index 0000000..88a1086 --- /dev/null +++ b/be/src/kudu/util/semaphore.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_SEMAPHORE_H +#define KUDU_UTIL_SEMAPHORE_H + +#include +#if defined(__APPLE__) +#include +#include "kudu/util/atomic.h" +#endif // define(__APPLE__) + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/port.h" +#include "kudu/util/monotime.h" + +namespace kudu { + +// Wrapper for POSIX semaphores. +class Semaphore { + public: + // Initialize the semaphore with the specified capacity. + explicit Semaphore(int capacity); + ~Semaphore(); + + // Acquire the semaphore. + void Acquire(); + + // Acquire the semaphore within the given timeout. Returns true if successful. + bool TimedAcquire(const MonoDelta& timeout); + + // Try to acquire the semaphore immediately. Returns false if unsuccessful. + bool TryAcquire(); + + // Release the semaphore. + void Release(); + + // Get the current value of the semaphore. + int GetValue(); + + // Boost-compatible wrappers. + void lock() { Acquire(); } + void unlock() { Release(); } + bool try_lock() { return TryAcquire(); } + + private: +#if !defined(__APPLE__) + // Log a fatal error message. Separated out to keep the main functions + // as small as possible in terms of code size. + void Fatal(const char* action) ATTRIBUTE_NORETURN; +#endif // !define(__APPLE__) + +#if defined(__APPLE__) + dispatch_semaphore_t sem_; + AtomicInt count_; +#else + sem_t sem_; +#endif // define(__APPLE__) + DISALLOW_COPY_AND_ASSIGN(Semaphore); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_SEMAPHORE_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/semaphore_macosx.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/semaphore_macosx.cc b/be/src/kudu/util/semaphore_macosx.cc new file mode 100644 index 0000000..7cc5f08 --- /dev/null +++ b/be/src/kudu/util/semaphore_macosx.cc @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/semaphore.h" + +#include +#include +#include "kudu/gutil/walltime.h" + +namespace kudu { + +Semaphore::Semaphore(int capacity) + : count_(capacity) { + DCHECK_GE(capacity, 0); + sem_ = dispatch_semaphore_create(capacity); + CHECK_NOTNULL(sem_); +} + +Semaphore::~Semaphore() { + dispatch_release(sem_); +} + +void Semaphore::Acquire() { + // If the timeout is DISPATCH_TIME_FOREVER, then dispatch_semaphore_wait() + // waits forever and always returns zero. + CHECK(dispatch_semaphore_wait(sem_, DISPATCH_TIME_FOREVER) == 0); + count_.IncrementBy(-1); +} + +bool Semaphore::TryAcquire() { + // The dispatch_semaphore_wait() function returns zero upon success and + // non-zero after the timeout expires. + if (dispatch_semaphore_wait(sem_, DISPATCH_TIME_NOW) == 0) { + count_.IncrementBy(-1); + return true; + } + return false; +} + +bool Semaphore::TimedAcquire(const MonoDelta& timeout) { + dispatch_time_t t = dispatch_time(DISPATCH_TIME_NOW, timeout.ToNanoseconds()); + if (dispatch_semaphore_wait(sem_, t) == 0) { + count_.IncrementBy(-1); + return true; + } + return false; +} + +void Semaphore::Release() { + dispatch_semaphore_signal(sem_); + count_.IncrementBy(1); +} + +int Semaphore::GetValue() { + return count_.Load(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/signal.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/signal.cc b/be/src/kudu/util/signal.cc new file mode 100644 index 0000000..2de3000 --- /dev/null +++ b/be/src/kudu/util/signal.cc @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/signal.h" + +#include "kudu/util/logging.h" + +namespace kudu { + +void SetSignalHandler(int signal, SignalHandlerCallback handler) { + struct sigaction act; + act.sa_handler = handler; + sigemptyset(&act.sa_mask); + act.sa_flags = 0; + PCHECK(sigaction(signal, &act, nullptr) == 0); +} + +void IgnoreSigPipe() { + SetSignalHandler(SIGPIPE, SIG_IGN); +} + +void ResetSigPipeHandlerToDefault() { + SetSignalHandler(SIGPIPE, SIG_DFL); +} + +// We unblock all signal masks since they are inherited. +void ResetAllSignalMasksToUnblocked() { + sigset_t signals; + PCHECK(sigfillset(&signals) == 0); + PCHECK(sigprocmask(SIG_UNBLOCK, &signals, nullptr) == 0); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/signal.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/signal.h b/be/src/kudu/util/signal.h new file mode 100644 index 0000000..0c88a80 --- /dev/null +++ b/be/src/kudu/util/signal.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace kudu { + +#if defined(__linux__) +typedef sighandler_t SignalHandlerCallback; +#else +typedef sig_t SignalHandlerCallback; +#endif + +// Set a process-wide signal handler. +void SetSignalHandler(int signal, SignalHandlerCallback handler); + +// Set the disposition of SIGPIPE to SIG_IGN. +void IgnoreSigPipe(); + +// Set the disposition of SIGPIPE to SIG_DFL. +void ResetSigPipeHandlerToDefault(); + +// Unblock all signal masks. +void ResetAllSignalMasksToUnblocked(); + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/slice-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/slice-test.cc b/be/src/kudu/util/slice-test.cc new file mode 100644 index 0000000..b0d3bef --- /dev/null +++ b/be/src/kudu/util/slice-test.cc @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/slice.h" + +#include + +#include "kudu/gutil/map-util.h" + +using std::string; + +namespace kudu { + +typedef SliceMap::type MySliceMap; + +TEST(SliceTest, TestSliceMap) { + MySliceMap my_map; + Slice a("a"); + Slice b("b"); + Slice c("c"); + + // Insertion is deliberately out-of-order; the map should restore order. + InsertOrDie(&my_map, c, 3); + InsertOrDie(&my_map, a, 1); + InsertOrDie(&my_map, b, 2); + + int expectedValue = 0; + for (const MySliceMap::value_type& pair : my_map) { + int data = 'a' + expectedValue++; + ASSERT_EQ(Slice(reinterpret_cast(&data), 1), pair.first); + ASSERT_EQ(expectedValue, pair.second); + } + + expectedValue = 0; + for (auto iter = my_map.begin(); iter != my_map.end(); iter++) { + int data = 'a' + expectedValue++; + ASSERT_EQ(Slice(reinterpret_cast(&data), 1), iter->first); + ASSERT_EQ(expectedValue, iter->second); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/slice.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/slice.cc b/be/src/kudu/util/slice.cc new file mode 100644 index 0000000..7e6ab50 --- /dev/null +++ b/be/src/kudu/util/slice.cc @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/slice.h" + +#include "kudu/gutil/stringprintf.h" +#include "kudu/util/status.h" +#include "kudu/util/logging.h" + +namespace kudu { + +Status Slice::check_size(size_t expected_size) const { + if (PREDICT_FALSE(size() != expected_size)) { + return Status::Corruption(StringPrintf("Unexpected Slice size. " + "Expected %zu but got %zu.", expected_size, size()), KUDU_REDACT(ToDebugString(100))); + } + return Status::OK(); +} + +// Return a string that contains the copy of the referenced data. +std::string Slice::ToString() const { + return std::string(reinterpret_cast(data_), size_); +} + +std::string Slice::ToDebugString(size_t max_len) const { + size_t bytes_to_print = size_; + bool abbreviated = false; + if (max_len != 0 && bytes_to_print > max_len) { + bytes_to_print = max_len; + abbreviated = true; + } + + int size = 0; + for (int i = 0; i < bytes_to_print; i++) { + if (!isgraph(data_[i])) { + size += 4; + } else { + size++; + } + } + if (abbreviated) { + size += 20; // extra padding + } + + std::string ret; + ret.reserve(size); + for (int i = 0; i < bytes_to_print; i++) { + if (!isgraph(data_[i])) { + StringAppendF(&ret, "\\x%02x", data_[i] & 0xff); + } else { + ret.push_back(data_[i]); + } + } + if (abbreviated) { + StringAppendF(&ret, "...<%zd bytes total>", size_); + } + return ret; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/slice.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/slice.h b/be/src/kudu/util/slice.h new file mode 100644 index 0000000..8738a5d --- /dev/null +++ b/be/src/kudu/util/slice.h @@ -0,0 +1,317 @@ +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +#ifndef KUDU_UTIL_SLICE_H_ +#define KUDU_UTIL_SLICE_H_ + +#include +#include +#include +#include +#include +#include + +#ifdef KUDU_HEADERS_USE_RICH_SLICE +#include "kudu/gutil/strings/fastmem.h" +#include "kudu/gutil/strings/stringpiece.h" +#include "kudu/util/faststring.h" +#endif +#include "kudu/util/kudu_export.h" + +namespace kudu { + +class Status; + +/// @brief A wrapper around externally allocated data. +/// +/// Slice is a simple structure containing a pointer into some external +/// storage and a size. The user of a Slice must ensure that the slice +/// is not used after the corresponding external storage has been +/// deallocated. +/// +/// Multiple threads can invoke const methods on a Slice without +/// external synchronization, but if any of the threads may call a +/// non-const method, all threads accessing the same Slice must use +/// external synchronization. +/// +/// Slices can be built around faststrings and StringPieces using constructors +/// with implicit casts. Both StringPieces and faststrings depend on a great +/// deal of gutil code. +class KUDU_EXPORT Slice { + public: + /// Create an empty slice. + Slice() : data_(reinterpret_cast("")), + size_(0) { } + + /// Create a slice that refers to a @c uint8_t byte array. + /// + /// @param [in] d + /// The input array. + /// @param [in] n + /// Number of bytes in the array. + Slice(const uint8_t* d, size_t n) : data_(d), size_(n) { } + + /// Create a slice that refers to a @c char byte array. + /// + /// @param [in] d + /// The input array. + /// @param [in] n + /// Number of bytes in the array. + Slice(const char* d, size_t n) : + data_(reinterpret_cast(d)), + size_(n) { } + + /// Create a slice that refers to the contents of the given string. + /// + /// @param [in] s + /// The input string. + Slice(const std::string& s) : // NOLINT(runtime/explicit) + data_(reinterpret_cast(s.data())), + size_(s.size()) { } + + /// Create a slice that refers to a C-string s[0,strlen(s)-1]. + /// + /// @param [in] s + /// The input C-string. + Slice(const char* s) : // NOLINT(runtime/explicit) + data_(reinterpret_cast(s)), + size_(strlen(s)) { } + +#ifdef KUDU_HEADERS_USE_RICH_SLICE + /// Create a slice that refers to the contents of a faststring. + /// + /// @note Further appends to the faststring may invalidate this slice. + /// + /// @param [in] s + /// The input faststring. + Slice(const faststring &s) // NOLINT(runtime/explicit) + : data_(s.data()), + size_(s.size()) { + } + + /// Create a slice that refers to the contents of a string piece. + /// + /// @param [in] s + /// The input StringPiece. + Slice(const StringPiece& s) // NOLINT(runtime/explicit) + : data_(reinterpret_cast(s.data())), + size_(s.size()) { + } +#endif + + /// @return A pointer to the beginning of the referenced data. + const uint8_t* data() const { return data_; } + + /// @return A mutable pointer to the beginning of the referenced data. + uint8_t *mutable_data() { return const_cast(data_); } + + /// @return The length (in bytes) of the referenced data. + size_t size() const { return size_; } + + /// @return @c true iff the length of the referenced data is zero. + bool empty() const { return size_ == 0; } + + /// @pre n < size() + /// + /// @param [in] n + /// The index of the byte. + /// @return the n-th byte in the referenced data. + const uint8_t &operator[](size_t n) const { + assert(n < size()); + return data_[n]; + } + + /// Change this slice to refer to an empty array. + void clear() { + data_ = reinterpret_cast(""); + size_ = 0; + } + + /// Drop the first "n" bytes from this slice. + /// + /// @pre n <= size() + /// + /// @note Only the base and bounds of the slice are changed; + /// the data is not modified. + /// + /// @param [in] n + /// Number of bytes that should be dropped from the beginning. + void remove_prefix(size_t n) { + assert(n <= size()); + data_ += n; + size_ -= n; + } + + /// Truncate the slice to the given number of bytes. + /// + /// @pre n <= size() + /// + /// @note Only the base and bounds of the slice are changed; + /// the data is not modified. + /// + /// @param [in] n + /// The new size of the slice. + void truncate(size_t n) { + assert(n <= size()); + size_ = n; + } + + /// Check that the slice has the expected size. + /// + /// @param [in] expected_size + /// @return Status::Corruption() iff size() != @c expected_size + Status check_size(size_t expected_size) const; + + /// @return A string that contains a copy of the referenced data. + std::string ToString() const; + + /// Get printable representation of the data in the slice. + /// + /// @param [in] max_len + /// The maximum number of bytes to output in the printable format; + /// @c 0 means no limit. + /// @return A string with printable representation of the data. + std::string ToDebugString(size_t max_len = 0) const; + + /// Do a three-way comparison of the slice's data. + /// + /// @param [in] b + /// The other slice to compare with. + /// @return Values are + /// @li < 0 iff "*this" < "b" + /// @li == 0 iff "*this" == "b" + /// @li > 0 iff "*this" > "b" + int compare(const Slice& b) const; + + /// Check whether the slice starts with the given prefix. + /// @param [in] x + /// The slice in question. + /// @return @c true iff "x" is a prefix of "*this" + bool starts_with(const Slice& x) const { + return ((size_ >= x.size_) && + (MemEqual(data_, x.data_, x.size_))); + } + + /// @brief Comparator struct, useful for ordered collections (like STL maps). + struct Comparator { + /// Compare two slices using Slice::compare() + /// + /// @param [in] a + /// The slice to call Slice::compare() at. + /// @param [in] b + /// The slice to use as a parameter for Slice::compare(). + /// @return @c true iff @c a is less than @c b by Slice::compare(). + bool operator()(const Slice& a, const Slice& b) const { + return a.compare(b) < 0; + } + }; + + /// Relocate/copy the slice's data into a new location. + /// + /// @param [in] d + /// The new location for the data. If it's the same location, then no + /// relocation is done. It is assumed that the new location is + /// large enough to fit the data. + void relocate(uint8_t* d) { + if (data_ != d) { + memcpy(d, data_, size_); + data_ = d; + } + } + + private: + friend bool operator==(const Slice& x, const Slice& y); + + static bool MemEqual(const void* a, const void* b, size_t n) { +#ifdef KUDU_HEADERS_USE_RICH_SLICE + return strings::memeq(a, b, n); +#else + return memcmp(a, b, n) == 0; +#endif + } + + static int MemCompare(const void* a, const void* b, size_t n) { +#ifdef KUDU_HEADERS_USE_RICH_SLICE + return strings::fastmemcmp_inlined(a, b, n); +#else + return memcmp(a, b, n); +#endif + } + + const uint8_t* data_; + size_t size_; + + // Intentionally copyable +}; + +/// Check whether two slices are identical. +/// +/// @param [in] x +/// One slice. +/// @param [in] y +/// Another slice. +/// @return @c true iff two slices contain byte-for-byte identical data. +inline bool operator==(const Slice& x, const Slice& y) { + return ((x.size() == y.size()) && + (Slice::MemEqual(x.data(), y.data(), x.size()))); +} + +/// Check whether two slices are not identical. +/// +/// @param [in] x +/// One slice. +/// @param [in] y +/// Another slice. +/// @return @c true iff slices contain different data. +inline bool operator!=(const Slice& x, const Slice& y) { + return !(x == y); +} + +/// Output printable representation of the slice into the given output stream. +/// +/// @param [out] o +/// The output stream to print the info. +/// @param [in] s +/// The slice to print. +/// @return Reference to the updated output stream. +inline std::ostream& operator<<(std::ostream& o, const Slice& s) { + return o << s.ToDebugString(16); // should be enough for anyone... +} + +inline int Slice::compare(const Slice& b) const { + const int min_len = (size_ < b.size_) ? size_ : b.size_; + int r = MemCompare(data_, b.data_, min_len); + if (r == 0) { + if (size_ < b.size_) r = -1; + else if (size_ > b.size_) r = +1; + } + return r; +} + +/// @brief STL map whose keys are Slices. +/// +/// An example of usage: +/// @code +/// typedef SliceMap::type MySliceMap; +/// +/// MySliceMap my_map; +/// my_map.insert(MySliceMap::value_type(a, 1)); +/// my_map.insert(MySliceMap::value_type(b, 2)); +/// my_map.insert(MySliceMap::value_type(c, 3)); +/// +/// for (const MySliceMap::value_type& pair : my_map) { +/// ... +/// } +/// @endcode +template +struct SliceMap { + /// A handy typedef for the slice map with appropriate comparison operator. + typedef std::map type; +}; + +} // namespace kudu + +#endif // KUDU_UTIL_SLICE_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/spinlock_profiling-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/spinlock_profiling-test.cc b/be/src/kudu/util/spinlock_profiling-test.cc new file mode 100644 index 0000000..4960c0f --- /dev/null +++ b/be/src/kudu/util/spinlock_profiling-test.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include "kudu/gutil/spinlock.h" +#include "kudu/util/spinlock_profiling.h" +#include "kudu/util/test_util.h" +#include "kudu/util/trace.h" + +// Can't include gutil/synchronization_profiling.h directly as it'll +// declare a weak symbol directly in this unit test, which the runtime +// linker will prefer over equivalent strong symbols for some reason. By +// declaring the symbol without providing an empty definition, the strong +// symbols are chosen when provided via shared libraries. +// +// Further reading: +// - http://stackoverflow.com/questions/20658809/dynamic-loading-and-weak-symbol-resolution +// - http://notmysock.org/blog/php/weak-symbols-arent.html +namespace gutil { +extern void SubmitSpinLockProfileData(const void *, int64); +} // namespace gutil + +namespace base { +extern void SubmitSpinLockProfileData(const void *, int64); +} // namespace base + +namespace kudu { + +class SpinLockProfilingTest : public KuduTest {}; + +TEST_F(SpinLockProfilingTest, TestSpinlockProfiling) { + scoped_refptr t(new Trace); + base::SpinLock lock; + { + ADOPT_TRACE(t.get()); + gutil::SubmitSpinLockProfileData(&lock, 4000000); + } + string result = t->DumpToString(); + LOG(INFO) << "trace: " << result; + ASSERT_STR_CONTAINS(result, "\"spinlock_wait_cycles\":4000000"); + // We can't assert more specifically because the CyclesPerSecond + // on different machines might be different. + ASSERT_STR_CONTAINS(result, "Waited "); + ASSERT_STR_CONTAINS(result, "on lock "); + + ASSERT_GT(GetSpinLockContentionMicros(), 0); +} + +TEST_F(SpinLockProfilingTest, TestStackCollection) { + StartSynchronizationProfiling(); + base::SpinLock lock; + gutil::SubmitSpinLockProfileData(&lock, 12345); + StopSynchronizationProfiling(); + std::ostringstream str; + int64_t dropped = 0; + FlushSynchronizationProfile(&str, &dropped); + string s = str.str(); + ASSERT_STR_CONTAINS(s, "12345\t1 @ "); + ASSERT_EQ(0, dropped); +} + +TEST_F(SpinLockProfilingTest, TestTcmallocContention) { + StartSynchronizationProfiling(); + base::SubmitSpinLockProfileData(nullptr, 12345); + ASSERT_GE(GetTcmallocContentionMicros(), 0); +} + +} // namespace kudu