impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [05/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8
Date Sat, 17 Jun 2017 07:25:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/spinlock_profiling.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling.cc b/be/src/kudu/util/spinlock_profiling.cc
new file mode 100644
index 0000000..001f8d5
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling.cc
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/spinlock_profiling.h"
+
+#include <sstream>
+
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/striped64.h"
+#include "kudu/util/trace.h"
+
+DEFINE_int32(lock_contention_trace_threshold_cycles,
+             2000000, // 2M cycles should be about 1ms
+             "If acquiring a spinlock takes more than this number of "
+             "cycles, and a Trace is currently active, then the current "
+             "stack trace is logged to the trace buffer.");
+TAG_FLAG(lock_contention_trace_threshold_cycles, hidden);
+
+METRIC_DEFINE_gauge_uint64(server, spinlock_contention_time,
+    "Spinlock Contention Time", kudu::MetricUnit::kMicroseconds,
+    "Amount of time consumed by contention on internal spinlocks since the server "
+    "started. If this increases rapidly, it may indicate a performance issue in Kudu "
+    "internals triggered by a particular workload and warrant investigation.",
+    kudu::EXPOSE_AS_COUNTER);
+METRIC_DEFINE_gauge_uint64(server, tcmalloc_contention_time,
+    "TCMalloc Contention Time", kudu::MetricUnit::kMicroseconds,
+    "Amount of time consumed by contention on tcmalloc's locks since the server "
+    "started. If this increases rapidly, it may indicate a performance issue in Kudu "
+    "internals triggered by a particular workload and warrant investigation.",
+    kudu::EXPOSE_AS_COUNTER);
+
+
+using base::SpinLock;
+using base::SpinLockHolder;
+
+namespace kudu {
+
+static const double kMicrosPerSecond = 1000000.0;
+
+static LongAdder* g_contended_cycles = nullptr;
+
+namespace {
+
+// We can't use LongAdder for tcmalloc contention, because its
+// implementation can allocate memory, and doing so is prohibited
+// in the tcmalloc contention callback.
+//
+// We pad and align this struct to two cachelines to avoid any false
+// sharing with the g_contended_cycles counter or any other globals.
+struct PaddedAtomic64 {
+  Atomic64 val;
+  char padding[CACHELINE_SIZE * 2 - sizeof(Atomic64)];
+} CACHELINE_ALIGNED;
+static PaddedAtomic64 g_tcmalloc_contention;
+
+// Implements a very simple linear-probing hashtable of stack traces with
+// a fixed number of entries.
+//
+// Threads experiencing contention record their stacks into this hashtable,
+// or increment an already-existing entry. Each entry has its own lock,
+// but we can "skip" an entry under contention, and spread out a single stack
+// into multiple buckets if necessary.
+//
+// A thread collecting a profile collects stack traces out of the hash table
+// and resets the counts to 0 as they are collected.
+class ContentionStacks {
+ public:
+  ContentionStacks()
+    : dropped_samples_(0) {
+  }
+
+  // Add a stack trace to the table.
+  void AddStack(const StackTrace& s, int64_t cycles);
+
+  // Flush stacks from the buffer to 'out'. See the docs for FlushSynchronizationProfile()
+  // in spinlock_profiling.h for details on format.
+  //
+  // On return, guarantees that any stack traces that were present at the beginning of
+  // the call have been flushed. However, new stacks can be added concurrently with this call.
+  void Flush(std::ostringstream* out, int64_t* dropped);
+
+ private:
+
+  // Collect the next sample from the underlying buffer, and set it back to 0 count
+  // (thus marking it as "empty").
+  //
+  // 'iterator' serves as a way to keep track of the current position in the buffer.
+  // Callers should initially set it to 0, and then pass the same pointer to each
+  // call to CollectSample. This serves to loop through the collected samples.
+  bool CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count, int64_t* cycles);
+
+  // Hashtable entry.
+  struct Entry {
+    Entry() : trip_count(0),
+              cycle_count(0) {
+    }
+
+    // Protects all other entries.
+    SpinLock lock;
+
+    // The number of times we've experienced contention with a stack trace equal
+    // to 'trace'.
+    //
+    // If this is 0, then the entry is "unclaimed" and the other fields are not
+    // considered valid.
+    int64_t trip_count;
+
+    // The total number of cycles spent waiting at this stack trace.
+    int64_t cycle_count;
+
+    // A cached hashcode of the trace.
+    uint64_t hash;
+
+    // The actual stack trace.
+    StackTrace trace;
+  };
+
+  enum {
+    kNumEntries = 1024,
+    kNumLinearProbeAttempts = 4
+  };
+  Entry entries_[kNumEntries];
+
+  // The number of samples which were dropped due to contention on this structure or
+  // due to the hashtable being too full.
+  AtomicInt<int64_t> dropped_samples_;
+};
+
+Atomic32 g_profiling_enabled = 0;
+ContentionStacks* g_contention_stacks = nullptr;
+
+void ContentionStacks::AddStack(const StackTrace& s, int64_t cycles) {
+  uint64_t hash = s.HashCode();
+
+  // Linear probe up to 4 attempts before giving up
+  for (int i = 0; i < kNumLinearProbeAttempts; i++) {
+    Entry* e = &entries_[(hash + i) % kNumEntries];
+    if (!e->lock.TryLock()) {
+      // If we fail to lock it, we can safely just use a different slot.
+      // It's OK if a single stack shows up multiple times, because pprof
+      // aggregates them in the end anyway.
+      continue;
+    }
+
+    if (e->trip_count == 0) {
+      // It's an un-claimed slot. Claim it.
+      e->hash = hash;
+      e->trace.CopyFrom(s);
+    } else if (e->hash != hash || !e->trace.Equals(s)) {
+      // It's claimed by a different stack trace.
+      e->lock.Unlock();
+      continue;
+    }
+
+    // Contribute to the stats for this stack.
+    e->cycle_count += cycles;
+    e->trip_count++;
+    e->lock.Unlock();
+    return;
+  }
+
+  // If we failed to find a matching hashtable slot, or we hit lock contention
+  // trying to record our sample, add it to the dropped sample count.
+  dropped_samples_.Increment();
+}
+
+void ContentionStacks::Flush(std::ostringstream* out, int64_t* dropped) {
+  uint64_t iterator = 0;
+  StackTrace t;
+  int64_t cycles;
+  int64_t count;
+  while (g_contention_stacks->CollectSample(&iterator, &t, &count, &cycles)) {
+    *out << cycles << "\t" << count
+         << " @ " << t.ToHexString(StackTrace::NO_FIX_CALLER_ADDRESSES)
+         << std::endl;
+  }
+
+  *dropped += dropped_samples_.Exchange(0);
+}
+
+bool ContentionStacks::CollectSample(uint64_t* iterator, StackTrace* s, int64_t* trip_count,
+                                     int64_t* cycles) {
+  while (*iterator < kNumEntries) {
+    Entry* e = &entries_[(*iterator)++];
+    SpinLockHolder l(&e->lock);
+    if (e->trip_count == 0) continue;
+
+    *trip_count = e->trip_count;
+    *cycles = e->cycle_count;
+    s->CopyFrom(e->trace);
+
+    e->trip_count = 0;
+    e->cycle_count = 0;
+    return true;
+  }
+
+  // Looped through the whole array and found nothing.
+  return false;
+}
+
+
+void SubmitSpinLockProfileData(const void *contendedlock, int64 wait_cycles) {
+  TRACE_COUNTER_INCREMENT("spinlock_wait_cycles", wait_cycles);
+  bool profiling_enabled = base::subtle::Acquire_Load(&g_profiling_enabled);
+  bool long_wait_time = wait_cycles > FLAGS_lock_contention_trace_threshold_cycles;
+  // Short circuit this function quickly in the common case.
+  if (PREDICT_TRUE(!profiling_enabled && !long_wait_time)) {
+    return;
+  }
+
+  static __thread bool in_func = false;
+  if (in_func) return; // non-re-entrant
+  in_func = true;
+
+  StackTrace stack;
+  stack.Collect();
+
+  if (profiling_enabled) {
+    DCHECK_NOTNULL(g_contention_stacks)->AddStack(stack, wait_cycles);
+  }
+
+  if (PREDICT_FALSE(long_wait_time)) {
+    Trace* t = Trace::CurrentTrace();
+    if (t) {
+      double seconds = static_cast<double>(wait_cycles) / base::CyclesPerSecond();
+      char backtrace_buffer[1024];
+      stack.StringifyToHex(backtrace_buffer, arraysize(backtrace_buffer));
+      TRACE_TO(t, "Waited $0 on lock $1. stack: $2",
+               HumanReadableElapsedTime::ToShortString(seconds), contendedlock,
+               backtrace_buffer);
+    }
+  }
+
+  LongAdder* la = reinterpret_cast<LongAdder*>(
+      base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&g_contended_cycles)));
+  if (la) {
+    la->IncrementBy(wait_cycles);
+  }
+
+  in_func = false;
+}
+
+void DoInit() {
+  base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contention_stacks),
+                              reinterpret_cast<uintptr_t>(new ContentionStacks()));
+  base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&g_contended_cycles),
+                              reinterpret_cast<uintptr_t>(new LongAdder()));
+}
+
+} // anonymous namespace
+
+void InitSpinLockContentionProfiling() {
+  static GoogleOnceType once = GOOGLE_ONCE_INIT;
+  GoogleOnceInit(&once, DoInit);
+}
+
+
+void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity) {
+  InitSpinLockContentionProfiling();
+  entity->NeverRetire(
+      METRIC_spinlock_contention_time.InstantiateFunctionGauge(
+          entity, Bind(&GetSpinLockContentionMicros)));
+  entity->NeverRetire(
+      METRIC_tcmalloc_contention_time.InstantiateFunctionGauge(
+          entity, Bind(&GetTcmallocContentionMicros)));
+
+}
+
+uint64_t GetSpinLockContentionMicros() {
+  int64_t wait_cycles = DCHECK_NOTNULL(g_contended_cycles)->Value();
+  double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
+    * kMicrosPerSecond;
+  return implicit_cast<int64_t>(micros);
+}
+
+uint64_t GetTcmallocContentionMicros() {
+  int64_t wait_cycles = base::subtle::NoBarrier_Load(&g_tcmalloc_contention.val);
+  double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
+    * kMicrosPerSecond;
+  return implicit_cast<int64_t>(micros);
+}
+
+void StartSynchronizationProfiling() {
+  InitSpinLockContentionProfiling();
+  base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, 1);
+}
+
+void FlushSynchronizationProfile(std::ostringstream* out,
+                                 int64_t* drop_count) {
+  CHECK_NOTNULL(g_contention_stacks)->Flush(out, drop_count);
+}
+
+void StopSynchronizationProfiling() {
+  InitSpinLockContentionProfiling();
+  CHECK_GE(base::subtle::Barrier_AtomicIncrement(&g_profiling_enabled, -1), 0);
+}
+
+} // namespace kudu
+
+// The hook expected by gutil is in the gutil namespace. Simply forward into the
+// kudu namespace so we don't need to qualify everything.
+namespace gutil {
+void SubmitSpinLockProfileData(const void *contendedlock, int64 wait_cycles) {
+  kudu::SubmitSpinLockProfileData(contendedlock, wait_cycles);
+}
+} // namespace gutil
+
+// tcmalloc's internal spinlocks also support submitting contention metrics
+// using the base::SubmitSpinLockProfileData weak symbol. However, this function might be
+// called while tcmalloc's heap lock is held. Thus, we cannot allocate memory here or else
+// we risk a deadlock. So, this implementation just does the bare minimum to expose
+// tcmalloc contention.
+namespace base {
+void SubmitSpinLockProfileData(const void* contendedlock, int64 wait_cycles) {
+#if !defined(__APPLE__)
+  auto t = kudu::Trace::CurrentTrace();
+  if (t) {
+    t->metrics()->IncrementTcmallocContentionCycles(wait_cycles);
+  }
+#endif // !defined(__APPLE__)
+  base::subtle::NoBarrier_AtomicIncrement(&kudu::g_tcmalloc_contention.val, wait_cycles);
+}
+} // namespace base

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/spinlock_profiling.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/spinlock_profiling.h b/be/src/kudu/util/spinlock_profiling.h
new file mode 100644
index 0000000..d5b5f15
--- /dev/null
+++ b/be/src/kudu/util/spinlock_profiling.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SPINLOCK_PROFILING_H
+#define KUDU_UTIL_SPINLOCK_PROFILING_H
+
+#include <iosfwd>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+
+namespace kudu {
+
+class MetricEntity;
+
+// Enable instrumentation of spinlock contention.
+//
+// Calling this method currently does nothing, except for ensuring
+// that the spinlock_profiling.cc object file gets linked into your
+// executable. It needs to be somewhere reachable in your code,
+// just so that gcc doesn't omit the underlying module from the binary.
+void InitSpinLockContentionProfiling();
+
+// Return the total number of microseconds spent in spinlock contention
+// since the server started.
+uint64_t GetSpinLockContentionMicros();
+
+// Return the total number of microseconds spent in tcmalloc contention
+// since the server started.
+uint64_t GetTcmallocContentionMicros();
+
+// Register metrics in the given server entity which measure the amount of
+// spinlock contention.
+void RegisterSpinLockContentionMetrics(const scoped_refptr<MetricEntity>& entity);
+
+// Enable process-wide synchronization profiling.
+//
+// While profiling is enabled, spinlock contention will be recorded in a buffer.
+// The caller should periodically call FlushSynchronizationProfile() to empty
+// the buffer, or else profiles may be dropped.
+void StartSynchronizationProfiling();
+
+// Flush the current buffer of contention profile samples to the given stream.
+//
+// Each stack trace that has been observed results in at least one line of the
+// following format:
+//   <cycles> <trip count> @ <hex stack trace>
+//
+// Flushing the data also clears the current buffer of trace samples.
+// This may be called while synchronization profiling is enabled or after it has
+// been disabled.
+//
+// *dropped_samples will be incremented by the number of samples which were dropped
+// due to the contention buffer overflowing. If profiling is enabled during this
+// call, then the 'drop_count' may be slightly out-of-date with respect to the
+// returned samples.
+void FlushSynchronizationProfile(std::ostringstream* out, int64_t* drop_count);
+
+// Stop collecting contention profiles.
+void StopSynchronizationProfiling();
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SPINLOCK_PROFILING_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/stack_watchdog-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/stack_watchdog-test.cc b/be/src/kudu/util/stack_watchdog-test.cc
new file mode 100644
index 0000000..9bbb097
--- /dev/null
+++ b/be/src/kudu/util/stack_watchdog-test.cc
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/kernel_stack_watchdog.h"
+
+#include <gflags/gflags.h>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_int32(hung_task_check_interval_ms);
+
+namespace kudu {
+
+class StackWatchdogTest : public KuduTest {
+ public:
+  virtual void SetUp() OVERRIDE {
+    KuduTest::SetUp();
+    KernelStackWatchdog::GetInstance()->SaveLogsForTests(true);
+    FLAGS_hung_task_check_interval_ms = 10;
+  }
+};
+
+// The KernelStackWatchdog is only enabled on Linux, since we can't get kernel
+// stack traces on other platforms.
+#if defined(__linux__)
+TEST_F(StackWatchdogTest, TestWatchdog) {
+  vector<string> log;
+  {
+    SCOPED_WATCH_STACK(20);
+    for (int i = 0; i < 50; i++) {
+      SleepFor(MonoDelta::FromMilliseconds(100));
+      log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests();
+      // Wait for several samples, since it's possible that we get unlucky
+      // and the watchdog sees us just before or after a sleep.
+      if (log.size() > 5) {
+        break;
+      }
+    }
+  }
+  string s = JoinStrings(log, "\n");
+  ASSERT_STR_CONTAINS(s, "TestWatchdog_Test::TestBody()");
+  ASSERT_STR_CONTAINS(s, "nanosleep");
+}
+#endif
+
+// Test that SCOPED_WATCH_STACK scopes can be nested.
+TEST_F(StackWatchdogTest, TestNestedScopes) {
+  vector<string> log;
+  int line1;
+  int line2;
+  {
+    SCOPED_WATCH_STACK(20); line1 = __LINE__;
+    {
+      SCOPED_WATCH_STACK(20); line2 = __LINE__;
+      for (int i = 0; i < 50; i++) {
+        SleepFor(MonoDelta::FromMilliseconds(100));
+        log = KernelStackWatchdog::GetInstance()->LoggedMessagesForTests();
+        if (log.size() > 3) {
+          break;
+        }
+      }
+    }
+  }
+
+  // Verify that both nested scopes were collected.
+  string s = JoinStrings(log, "\n");
+  ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line1));
+  ASSERT_STR_CONTAINS(s, Substitute("stack_watchdog-test.cc:$0", line2));
+}
+
+TEST_F(StackWatchdogTest, TestPerformance) {
+  // Reset the check interval to be reasonable. Otherwise the benchmark
+  // wastes a lot of CPU running the watchdog thread too often.
+  FLAGS_hung_task_check_interval_ms = 500;
+  LOG_TIMING(INFO, "1M SCOPED_WATCH_STACK()s") {
+    for (int i = 0; i < 1000000; i++) {
+      SCOPED_WATCH_STACK(100);
+    }
+  }
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status-test.cc b/be/src/kudu/util/status-test.cc
new file mode 100644
index 0000000..ca33b89
--- /dev/null
+++ b/be/src/kudu/util/status-test.cc
@@ -0,0 +1,98 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <errno.h>
+#include <vector>
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(StatusTest, TestPosixCode) {
+  Status ok = Status::OK();
+  ASSERT_EQ(0, ok.posix_code());
+  Status file_error = Status::IOError("file error", Slice(), ENOTDIR);
+  ASSERT_EQ(ENOTDIR, file_error.posix_code());
+}
+
+TEST(StatusTest, TestToString) {
+  Status file_error = Status::IOError("file error", Slice(), ENOTDIR);
+  ASSERT_EQ(string("IO error: file error (error 20)"), file_error.ToString());
+}
+
+TEST(StatusTest, TestClonePrepend) {
+  Status file_error = Status::IOError("file error", "msg2", ENOTDIR);
+  Status appended = file_error.CloneAndPrepend("Heading");
+  ASSERT_EQ(string("IO error: Heading: file error: msg2 (error 20)"), appended.ToString());
+}
+
+TEST(StatusTest, TestCloneAppend) {
+  Status remote_error = Status::RemoteError("Application error");
+  Status appended = remote_error.CloneAndAppend(Status::NotFound("Unknown tablet").ToString());
+  ASSERT_EQ(string("Remote error: Application error: Not found: Unknown tablet"),
+            appended.ToString());
+}
+
+TEST(StatusTest, TestMemoryUsage) {
+  ASSERT_EQ(0, Status::OK().memory_footprint_excluding_this());
+  ASSERT_GT(Status::IOError(
+      "file error", "some other thing", ENOTDIR).memory_footprint_excluding_this(), 0);
+}
+
+TEST(StatusTest, TestMoveConstructor) {
+  // OK->OK move should do nothing.
+  {
+    Status src = Status::OK();
+    Status dst = std::move(src);
+    ASSERT_OK(src);
+    ASSERT_OK(dst);
+  }
+
+  // Moving a not-OK status into a new one should make the moved status
+  // "OK".
+  {
+    Status src = Status::NotFound("foo");
+    Status dst = std::move(src);
+    ASSERT_OK(src);
+    ASSERT_EQ("Not found: foo", dst.ToString());
+  }
+}
+
+TEST(StatusTest, TestMoveAssignment) {
+  // OK->Bad move should clear the source status and also make the
+  // destination status OK.
+  {
+    Status src = Status::OK();
+    Status dst = Status::NotFound("orig dst");
+    dst = std::move(src);
+    ASSERT_OK(src);
+    ASSERT_OK(dst);
+  }
+
+  // Bad->Bad move.
+  {
+    Status src = Status::NotFound("orig src");
+    Status dst = Status::NotFound("orig dst");
+    dst = std::move(src);
+    ASSERT_OK(src);
+    ASSERT_EQ("Not found: orig src", dst.ToString());
+  }
+
+  // Bad->OK move
+  {
+    Status src = Status::NotFound("orig src");
+    Status dst = Status::OK();
+    dst = std::move(src);
+    ASSERT_OK(src);
+    ASSERT_EQ("Not found: orig src", dst.ToString());
+  }
+}
+
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status.cc b/be/src/kudu/util/status.cc
new file mode 100644
index 0000000..9f88da1
--- /dev/null
+++ b/be/src/kudu/util/status.cc
@@ -0,0 +1,162 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "kudu/util/status.h"
+
+#include <stdio.h>
+#include <stdint.h>
+
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+const char* Status::CopyState(const char* state) {
+  uint32_t size;
+  strings::memcpy_inlined(&size, state, sizeof(size));
+  auto result = new char[size + 7];
+  strings::memcpy_inlined(result, state, size + 7);
+  return result;
+}
+
+Status::Status(Code code, const Slice& msg, const Slice& msg2,
+               int16_t posix_code) {
+  DCHECK(code != kOk);
+  const uint32_t len1 = msg.size();
+  const uint32_t len2 = msg2.size();
+  const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
+  auto result = new char[size + 7];
+  memcpy(result, &size, sizeof(size));
+  result[4] = static_cast<char>(code);
+  memcpy(result + 5, &posix_code, sizeof(posix_code));
+  memcpy(result + 7, msg.data(), len1);
+  if (len2) {
+    result[7 + len1] = ':';
+    result[8 + len1] = ' ';
+    memcpy(result + 9 + len1, msg2.data(), len2);
+  }
+  state_ = result;
+}
+
+std::string Status::CodeAsString() const {
+  if (state_ == nullptr) {
+    return "OK";
+  }
+
+  const char* type;
+  switch (code()) {
+    case kOk:
+      type = "OK";
+      break;
+    case kNotFound:
+      type = "Not found";
+      break;
+    case kCorruption:
+      type = "Corruption";
+      break;
+    case kNotSupported:
+      type = "Not implemented";
+      break;
+    case kInvalidArgument:
+      type = "Invalid argument";
+      break;
+    case kIOError:
+      type = "IO error";
+      break;
+    case kAlreadyPresent:
+      type = "Already present";
+      break;
+    case kRuntimeError:
+      type = "Runtime error";
+      break;
+    case kNetworkError:
+      type = "Network error";
+      break;
+    case kIllegalState:
+      type = "Illegal state";
+      break;
+    case kNotAuthorized:
+      type = "Not authorized";
+      break;
+    case kAborted:
+      type = "Aborted";
+      break;
+    case kRemoteError:
+      type = "Remote error";
+      break;
+    case kServiceUnavailable:
+      type = "Service unavailable";
+      break;
+    case kTimedOut:
+      type = "Timed out";
+      break;
+    case kUninitialized:
+      type = "Uninitialized";
+      break;
+    case kConfigurationError:
+      type = "Configuration error";
+      break;
+    case kIncomplete:
+      type = "Incomplete";
+      break;
+    case kEndOfFile:
+      type = "End of file";
+      break;
+  }
+  return std::string(type);
+}
+
+std::string Status::ToString() const {
+  std::string result(CodeAsString());
+  if (state_ == nullptr) {
+    return result;
+  }
+
+  result.append(": ");
+  Slice msg = message();
+  result.append(reinterpret_cast<const char*>(msg.data()), msg.size());
+  int16_t posix = posix_code();
+  if (posix != -1) {
+    char buf[64];
+    snprintf(buf, sizeof(buf), " (error %d)", posix);
+    result.append(buf);
+  }
+  return result;
+}
+
+Slice Status::message() const {
+  if (state_ == nullptr) {
+    return Slice();
+  }
+
+  uint32_t length;
+  memcpy(&length, state_, sizeof(length));
+  return Slice(state_ + 7, length);
+}
+
+int16_t Status::posix_code() const {
+  if (state_ == nullptr) {
+    return 0;
+  }
+  int16_t posix_code;
+  memcpy(&posix_code, state_ + 5, sizeof(posix_code));
+  return posix_code;
+}
+
+Status Status::CloneAndPrepend(const Slice& msg) const {
+  return Status(code(), msg, message(), posix_code());
+}
+
+Status Status::CloneAndAppend(const Slice& msg) const {
+  return Status(code(), message(), msg, posix_code());
+}
+
+size_t Status::memory_footprint_excluding_this() const {
+  return state_ ? kudu_malloc_usable_size(state_) : 0;
+}
+
+size_t Status::memory_footprint_including_this() const {
+  return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status.h b/be/src/kudu/util/status.h
new file mode 100644
index 0000000..93a25a6
--- /dev/null
+++ b/be/src/kudu/util/status.h
@@ -0,0 +1,433 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// A Status encapsulates the result of an operation.  It may indicate success,
+// or it may indicate an error with an associated error message.
+//
+// Multiple threads can invoke const methods on a Status without
+// external synchronization, but if any of the threads may call a
+// non-const method, all threads accessing the same Status must use
+// external synchronization.
+
+#ifndef KUDU_UTIL_STATUS_H_
+#define KUDU_UTIL_STATUS_H_
+
+#include <stdint.h>
+#include <string>
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#else
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+#include "kudu/util/slice.h"
+
+/// @brief Return the given status if it is not @c OK.
+#define KUDU_RETURN_NOT_OK(s) do { \
+    const ::kudu::Status& _s = (s);             \
+    if (PREDICT_FALSE(!_s.ok())) return _s;     \
+  } while (0);
+
+/// @brief Return the given status if it is not OK, but first clone it and
+///   prepend the given message.
+#define KUDU_RETURN_NOT_OK_PREPEND(s, msg) do { \
+    const ::kudu::Status& _s = (s);                              \
+    if (PREDICT_FALSE(!_s.ok())) return _s.CloneAndPrepend(msg); \
+  } while (0);
+
+/// @brief Return @c to_return if @c to_call returns a bad status.
+///   The substitution for 'to_return' may reference the variable
+///   @c s for the bad status.
+#define KUDU_RETURN_NOT_OK_RET(to_call, to_return) do { \
+    const ::kudu::Status& s = (to_call);                \
+    if (PREDICT_FALSE(!s.ok())) return (to_return);  \
+  } while (0);
+
+/// @brief Emit a warning if @c to_call returns a bad status.
+#define KUDU_WARN_NOT_OK(to_call, warning_prefix) do { \
+    const ::kudu::Status& _s = (to_call);              \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      KUDU_LOG(WARNING) << (warning_prefix) << ": " << _s.ToString();  \
+    } \
+  } while (0);
+
+/// @brief Log the given status and return immediately.
+#define KUDU_LOG_AND_RETURN(level, status) do { \
+    const ::kudu::Status& _s = (status);        \
+    KUDU_LOG(level) << _s.ToString(); \
+    return _s; \
+  } while (0);
+
+/// @brief If the given status is not OK, log it and 'msg' at 'level' and return the status.
+#define KUDU_RETURN_NOT_OK_LOG(s, level, msg) do { \
+    const ::kudu::Status& _s = (s);             \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      KUDU_LOG(level) << "Status: " << _s.ToString() << " " << (msg); \
+      return _s;     \
+    } \
+  } while (0);
+
+/// @brief If @c to_call returns a bad status, CHECK immediately with
+///   a logged message of @c msg followed by the status.
+#define KUDU_CHECK_OK_PREPEND(to_call, msg) do { \
+    const ::kudu::Status& _s = (to_call);                   \
+    KUDU_CHECK(_s.ok()) << (msg) << ": " << _s.ToString();  \
+  } while (0);
+
+/// @brief If the status is bad, CHECK immediately, appending the status to the
+///   logged message.
+#define KUDU_CHECK_OK(s) KUDU_CHECK_OK_PREPEND(s, "Bad status")
+
+/// @brief If @c to_call returns a bad status, DCHECK immediately with
+///   a logged message of @c msg followed by the status.
+#define KUDU_DCHECK_OK_PREPEND(to_call, msg) do { \
+    const ::kudu::Status& _s = (to_call);                   \
+    KUDU_DCHECK(_s.ok()) << (msg) << ": " << _s.ToString();  \
+  } while (0);
+
+/// @brief If the status is bad, DCHECK immediately, appending the status to the
+///   logged 'Bad status' message.
+#define KUDU_DCHECK_OK(s) KUDU_DCHECK_OK_PREPEND(s, "Bad status")
+
+/// @file status.h
+///
+/// This header is used in both the Kudu build as well as in builds of
+/// applications that use the Kudu C++ client. In the latter we need to be
+/// careful to "namespace" our macros, to avoid colliding or overriding with
+/// similarly named macros belonging to the application.
+///
+/// KUDU_HEADERS_USE_SHORT_STATUS_MACROS handles this behavioral change. When
+/// defined, we're building Kudu and:
+/// @li Non-namespaced macros are allowed and mapped to the namespaced versions
+///   defined above.
+/// @li Namespaced versions of glog macros are mapped to the real glog macros
+///   (otherwise the macros are defined in the C++ client stubs).
+#ifdef KUDU_HEADERS_USE_SHORT_STATUS_MACROS
+#define RETURN_NOT_OK         KUDU_RETURN_NOT_OK
+#define RETURN_NOT_OK_PREPEND KUDU_RETURN_NOT_OK_PREPEND
+#define RETURN_NOT_OK_RET     KUDU_RETURN_NOT_OK_RET
+#define WARN_NOT_OK           KUDU_WARN_NOT_OK
+#define LOG_AND_RETURN        KUDU_LOG_AND_RETURN
+#define RETURN_NOT_OK_LOG     KUDU_RETURN_NOT_OK_LOG
+#define CHECK_OK_PREPEND      KUDU_CHECK_OK_PREPEND
+#define CHECK_OK              KUDU_CHECK_OK
+#define DCHECK_OK_PREPEND     KUDU_DCHECK_OK_PREPEND
+#define DCHECK_OK             KUDU_DCHECK_OK
+
+// These are standard glog macros.
+#define KUDU_LOG              LOG
+#define KUDU_CHECK            CHECK
+#define KUDU_DCHECK           DCHECK
+#endif
+
+namespace kudu {
+
+/// @brief A representation of an operation's outcome.
+class KUDU_EXPORT Status {
+ public:
+  /// Create an object representing success status.
+  Status() : state_(NULL) { }
+
+  ~Status() { delete[] state_; }
+
+  /// Copy the specified status.
+  ///
+  /// @param [in] s
+  ///   The status object to copy from.
+  Status(const Status& s);
+
+  /// Assign the specified status.
+  ///
+  /// @param [in] s
+  ///   The status object to assign from.
+  /// @return The reference to the modified object.
+  Status& operator=(const Status& s);
+
+#if __cplusplus >= 201103L
+  /// Move the specified status (C++11).
+  ///
+  /// @param [in] s
+  ///   rvalue reference to a Status object.
+  Status(Status&& s);
+
+  /// Assign the specified status using move semantics (C++11).
+  ///
+  /// @param [in] s
+  ///   rvalue reference to a Status object.
+  /// @return The reference to the modified object.
+  Status& operator=(Status&& s);
+#endif
+
+  /// @return A success status.
+  static Status OK() { return Status(); }
+
+
+  /// @name Methods to build status objects for various types of errors.
+  ///
+  /// @param [in] msg
+  ///   The informational message on the error.
+  /// @param [in] msg2
+  ///   Additional information on the error (optional).
+  /// @param [in] posix_code
+  ///   POSIX error code, if applicable (optional).
+  /// @return The error status of an appropriate type.
+  ///
+  ///@{
+  static Status NotFound(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNotFound, msg, msg2, posix_code);
+  }
+  static Status Corruption(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kCorruption, msg, msg2, posix_code);
+  }
+  static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNotSupported, msg, msg2, posix_code);
+  }
+  static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kInvalidArgument, msg, msg2, posix_code);
+  }
+  static Status IOError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kIOError, msg, msg2, posix_code);
+  }
+  static Status AlreadyPresent(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kAlreadyPresent, msg, msg2, posix_code);
+  }
+  static Status RuntimeError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kRuntimeError, msg, msg2, posix_code);
+  }
+  static Status NetworkError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNetworkError, msg, msg2, posix_code);
+  }
+  static Status IllegalState(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kIllegalState, msg, msg2, posix_code);
+  }
+  static Status NotAuthorized(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kNotAuthorized, msg, msg2, posix_code);
+  }
+  static Status Aborted(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kAborted, msg, msg2, posix_code);
+  }
+  static Status RemoteError(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kRemoteError, msg, msg2, posix_code);
+  }
+  static Status ServiceUnavailable(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kServiceUnavailable, msg, msg2, posix_code);
+  }
+  static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice(),
+                         int16_t posix_code = -1) {
+    return Status(kTimedOut, msg, msg2, posix_code);
+  }
+  static Status Uninitialized(const Slice& msg, const Slice& msg2 = Slice(),
+                              int16_t posix_code = -1) {
+    return Status(kUninitialized, msg, msg2, posix_code);
+  }
+  static Status ConfigurationError(const Slice& msg, const Slice& msg2 = Slice(),
+                                   int16_t posix_code = -1) {
+    return Status(kConfigurationError, msg, msg2, posix_code);
+  }
+  static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice(),
+                           int64_t posix_code = -1) {
+    return Status(kIncomplete, msg, msg2, posix_code);
+  }
+  static Status EndOfFile(const Slice& msg, const Slice& msg2 = Slice(),
+                          int64_t posix_code = -1) {
+    return Status(kEndOfFile, msg, msg2, posix_code);
+  }
+  ///@}
+
+  /// @return @c true iff the status indicates success.
+  bool ok() const { return (state_ == NULL); }
+
+  /// @return @c true iff the status indicates a NotFound error.
+  bool IsNotFound() const { return code() == kNotFound; }
+
+  /// @return @c true iff the status indicates a Corruption error.
+  bool IsCorruption() const { return code() == kCorruption; }
+
+  /// @return @c true iff the status indicates a NotSupported error.
+  bool IsNotSupported() const { return code() == kNotSupported; }
+
+  /// @return @c true iff the status indicates an IOError.
+  bool IsIOError() const { return code() == kIOError; }
+
+  /// @return @c true iff the status indicates an InvalidArgument error.
+  bool IsInvalidArgument() const { return code() == kInvalidArgument; }
+
+  /// @return @c true iff the status indicates an AlreadyPresent error.
+  bool IsAlreadyPresent() const { return code() == kAlreadyPresent; }
+
+  /// @return @c true iff the status indicates a RuntimeError.
+  bool IsRuntimeError() const { return code() == kRuntimeError; }
+
+  /// @return @c true iff the status indicates a NetworkError.
+  bool IsNetworkError() const { return code() == kNetworkError; }
+
+  /// @return @c true iff the status indicates an IllegalState error.
+  bool IsIllegalState() const { return code() == kIllegalState; }
+
+  /// @return @c true iff the status indicates a NotAuthorized error.
+  bool IsNotAuthorized() const { return code() == kNotAuthorized; }
+
+  /// @return @c true iff the status indicates an Aborted error.
+  bool IsAborted() const { return code() == kAborted; }
+
+  /// @return @c true iff the status indicates a RemoteError.
+  bool IsRemoteError() const { return code() == kRemoteError; }
+
+  /// @return @c true iff the status indicates ServiceUnavailable.
+  bool IsServiceUnavailable() const { return code() == kServiceUnavailable; }
+
+  /// @return @c true iff the status indicates TimedOut.
+  bool IsTimedOut() const { return code() == kTimedOut; }
+
+  /// @return @c true iff the status indicates Uninitialized.
+  bool IsUninitialized() const { return code() == kUninitialized; }
+
+  /// @return @c true iff the status indicates ConfigurationError.
+  bool IsConfigurationError() const { return code() == kConfigurationError; }
+
+  /// @return @c true iff the status indicates Incomplete.
+  bool IsIncomplete() const { return code() == kIncomplete; }
+
+  /// @return @c true iff the status indicates end of file.
+  bool IsEndOfFile() const { return code() == kEndOfFile; }
+
+  /// @return A string representation of this status suitable for printing.
+  ///   Returns the string "OK" for success.
+  std::string ToString() const;
+
+  /// @return A string representation of the status code, without the message
+  ///   text or POSIX code information.
+  std::string CodeAsString() const;
+
+  /// This is similar to ToString, except that it does not include
+  /// the stringified error code or POSIX code.
+  ///
+  /// @note The returned Slice is only valid as long as this Status object
+  ///   remains live and unchanged.
+  ///
+  /// @return The message portion of the Status. For @c OK statuses,
+  ///   this returns an empty string.
+  Slice message() const;
+
+  /// @return The POSIX code associated with this Status object,
+  ///   or @c -1 if there is none.
+  int16_t posix_code() const;
+
+  /// Clone the object and add the specified prefix to the clone's message.
+  ///
+  /// @param [in] msg
+  ///   The message to prepend.
+  /// @return A new Status object with the same state plus an additional
+  ///   leading message.
+  Status CloneAndPrepend(const Slice& msg) const;
+
+  /// Clone the object and add the specified suffix to the clone's message.
+  ///
+  /// @param [in] msg
+  ///   The message to append.
+  /// @return A new Status object with the same state plus an additional
+  ///   trailing message.
+  Status CloneAndAppend(const Slice& msg) const;
+
+  /// @return The memory usage of this object without the object itself.
+  ///   Should be used when embedded inside another object.
+  size_t memory_footprint_excluding_this() const;
+
+  /// @return The memory usage of this object including the object itself.
+  ///   Should be used when allocated on the heap.
+  size_t memory_footprint_including_this() const;
+
+ private:
+  // OK status has a NULL state_.  Otherwise, state_ is a new[] array
+  // of the following form:
+  //    state_[0..3] == length of message
+  //    state_[4]    == code
+  //    state_[5..6] == posix_code
+  //    state_[7..]  == message
+  const char* state_;
+
+  enum Code {
+    kOk = 0,
+    kNotFound = 1,
+    kCorruption = 2,
+    kNotSupported = 3,
+    kInvalidArgument = 4,
+    kIOError = 5,
+    kAlreadyPresent = 6,
+    kRuntimeError = 7,
+    kNetworkError = 8,
+    kIllegalState = 9,
+    kNotAuthorized = 10,
+    kAborted = 11,
+    kRemoteError = 12,
+    kServiceUnavailable = 13,
+    kTimedOut = 14,
+    kUninitialized = 15,
+    kConfigurationError = 16,
+    kIncomplete = 17,
+    kEndOfFile = 18,
+    // NOTE: Remember to duplicate these constants into wire_protocol.proto and
+    // and to add StatusTo/FromPB ser/deser cases in wire_protocol.cc !
+    // Also remember to make the same changes to the java client in Status.java.
+    //
+    // TODO: Move error codes into an error_code.proto or something similar.
+  };
+  COMPILE_ASSERT(sizeof(Code) == 4, code_enum_size_is_part_of_abi);
+
+  Code code() const {
+    return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]);
+  }
+
+  Status(Code code, const Slice& msg, const Slice& msg2, int16_t posix_code);
+  static const char* CopyState(const char* s);
+};
+
+inline Status::Status(const Status& s) {
+  state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+}
+
+inline Status& Status::operator=(const Status& s) {
+  // The following condition catches both aliasing (when this == &s),
+  // and the common case where both s and *this are OK.
+  if (state_ != s.state_) {
+    delete[] state_;
+    state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
+  }
+  return *this;
+}
+
+#if __cplusplus >= 201103L
+inline Status::Status(Status&& s) : state_(s.state_) {
+  s.state_ = nullptr;
+}
+
+inline Status& Status::operator=(Status&& s) {
+  if (state_ != s.state_) {
+    delete[] state_;
+    state_ = s.state_;
+    s.state_ = nullptr;
+  }
+  return *this;
+}
+#endif
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_STATUS_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status_callback.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status_callback.cc b/be/src/kudu/util/status_callback.cc
new file mode 100644
index 0000000..667bfec
--- /dev/null
+++ b/be/src/kudu/util/status_callback.cc
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/status_callback.h"
+
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+
+void DoNothingStatusCB(const Status& status) {}
+
+void CrashIfNotOkStatusCB(const string& message, const Status& status) {
+  if (PREDICT_FALSE(!status.ok())) {
+    LOG(FATAL) << message << ": " << status.ToString();
+  }
+}
+
+Status DoNothingStatusClosure() { return Status::OK(); }
+
+} // end namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/status_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/status_callback.h b/be/src/kudu/util/status_callback.h
new file mode 100644
index 0000000..3a36b83
--- /dev/null
+++ b/be/src/kudu/util/status_callback.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_STATUS_CALLBACK_H
+#define KUDU_UTIL_STATUS_CALLBACK_H
+
+#include <string>
+
+#include "kudu/gutil/callback_forward.h"
+
+namespace kudu {
+
+class Status;
+
+// A callback which takes a Status. This is typically used for functions which
+// produce asynchronous results and may fail.
+typedef Callback<void(const Status& status)> StatusCallback;
+
+// To be used when a function signature requires a StatusCallback but none
+// is needed.
+extern void DoNothingStatusCB(const Status& status);
+
+// A callback that crashes with a FATAL log message if the given Status is not OK.
+extern void CrashIfNotOkStatusCB(const std::string& message, const Status& status);
+
+// A closure (callback without arguments) that returns a Status indicating
+// whether it was successful or not.
+typedef Callback<Status(void)> StatusClosure;
+
+// To be used when setting a StatusClosure is optional.
+extern Status DoNothingStatusClosure();
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/stopwatch.h b/be/src/kudu/util/stopwatch.h
new file mode 100644
index 0000000..e86d90c
--- /dev/null
+++ b/be/src/kudu/util/stopwatch.h
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_STOPWATCH_H
+#define KUDU_UTIL_STOPWATCH_H
+
+#include <glog/logging.h>
+#include <sys/resource.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string>
+#if defined(__APPLE__)
+#include <mach/clock.h>
+#include <mach/mach.h>
+#endif  // defined(__APPLE__)
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/walltime.h"
+
+namespace kudu {
+
+// Macro for logging timing of a block. Usage:
+//   LOG_TIMING_PREFIX_IF(INFO, FLAGS_should_record_time, "Tablet X: ", "doing some task") {
+//     ... some task which takes some time
+//   }
+// If FLAGS_should_record_time is true, yields a log like:
+// I1102 14:35:51.726186 23082 file.cc:167] Tablet X: Time spent doing some task:
+//   real 3.729s user 3.570s sys 0.150s
+// The task will always execute regardless of whether the timing information is
+// printed.
+#define LOG_TIMING_PREFIX_IF(severity, condition, prefix, description) \
+  for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, prefix, description, \
+          -1, (condition)); !_l.HasRun(); _l.MarkHasRun())
+
+// Conditionally log, no prefix.
+#define LOG_TIMING_IF(severity, condition, description) \
+  LOG_TIMING_PREFIX_IF(severity, (condition), "", (description))
+
+// Always log, including prefix.
+#define LOG_TIMING_PREFIX(severity, prefix, description) \
+  LOG_TIMING_PREFIX_IF(severity, true, (prefix), (description))
+
+// Always log, no prefix.
+#define LOG_TIMING(severity, description) \
+  LOG_TIMING_IF(severity, true, (description))
+
+// Macro to log the time spent in the rest of the block.
+#define SCOPED_LOG_TIMING(severity, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::severity, "", description, -1, true);
+
+// Scoped version of LOG_SLOW_EXECUTION().
+#define SCOPED_LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::severity, "", description, max_expected_millis, true)
+
+// Scoped version of LOG_SLOW_EXECUTION() but with a prefix.
+#define SCOPED_LOG_SLOW_EXECUTION_PREFIX(severity, max_expected_millis, prefix, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::severity, prefix, description, max_expected_millis, true)
+
+// Macro for logging timing of a block. Usage:
+//   LOG_SLOW_EXECUTION(INFO, 5, "doing some task") {
+//     ... some task which takes some time
+//   }
+// when slower than 5 milliseconds, yields a log like:
+// I1102 14:35:51.726186 23082 file.cc:167] Time spent doing some task:
+//   real 3.729s user 3.570s sys 0.150s
+#define LOG_SLOW_EXECUTION(severity, max_expected_millis, description) \
+  for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::severity, "", description, \
+          max_expected_millis, true); !_l.HasRun(); _l.MarkHasRun())
+
+// Macro for vlogging timing of a block. The execution happens regardless of the vlog_level,
+// it's only the logging that's affected.
+// Usage:
+//   VLOG_TIMING(1, "doing some task") {
+//     ... some task which takes some time
+//   }
+// Yields a log just like LOG_TIMING's.
+#define VLOG_TIMING(vlog_level, description) \
+  for (kudu::sw_internal::LogTiming _l(__FILE__, __LINE__, google::INFO, "", description, \
+          -1, VLOG_IS_ON(vlog_level)); !_l.HasRun(); _l.MarkHasRun())
+
+// Macro to log the time spent in the rest of the block.
+#define SCOPED_VLOG_TIMING(vlog_level, description) \
+  kudu::sw_internal::LogTiming VARNAME_LINENUM(_log_timing)(__FILE__, __LINE__, \
+      google::INFO, "", description, -1, VLOG_IS_ON(vlog_level));
+
+#define NANOS_PER_SECOND 1000000000.0
+#define NANOS_PER_MILLISECOND 1000000.0
+
+class Stopwatch;
+
+typedef uint64_t nanosecond_type;
+
+// Structure which contains an elapsed amount of wall/user/sys time.
+struct CpuTimes {
+  nanosecond_type wall;
+  nanosecond_type user;
+  nanosecond_type system;
+  int64_t context_switches;
+
+  void clear() { wall = user = system = context_switches = 0LL; }
+
+  // Return a string formatted similar to the output of the "time" shell command.
+  std::string ToString() const {
+    return StringPrintf(
+      "real %.3fs\tuser %.3fs\tsys %.3fs",
+      wall_seconds(), user_cpu_seconds(), system_cpu_seconds());
+  }
+
+  double wall_millis() const {
+    return static_cast<double>(wall) / NANOS_PER_MILLISECOND;
+  }
+
+  double wall_seconds() const {
+    return static_cast<double>(wall) / NANOS_PER_SECOND;
+  }
+
+  double user_cpu_seconds() const {
+    return static_cast<double>(user) / NANOS_PER_SECOND;
+  }
+
+  double system_cpu_seconds() const {
+    return static_cast<double>(system) / NANOS_PER_SECOND;
+  }
+};
+
+// A Stopwatch is a convenient way of timing a given operation.
+//
+// Wall clock time is based on a monotonic timer, so can be reliably used for
+// determining durations.
+// CPU time is based on either current thread's usage or the usage of the whole
+// process, depending on the value of 'Mode' passed to the constructor.
+//
+// The implementation relies on several syscalls, so should not be used for
+// hot paths, but is useful for timing anything on the granularity of seconds
+// or more.
+//
+// NOTE: the user time reported by this class is based on Linux scheduler ticks
+// and thus has low precision. Use GetThreadCpuTimeMicros() from walltime.h if
+// more accurate per-thread CPU usage timing is required.
+class Stopwatch {
+ public:
+
+  enum Mode {
+    // Collect usage only about the calling thread.
+    // This may not be supported on older versions of Linux.
+    THIS_THREAD,
+    // Collect usage of all threads.
+    ALL_THREADS
+  };
+
+  // Construct a new stopwatch. The stopwatch is initially stopped.
+  explicit Stopwatch(Mode mode = THIS_THREAD)
+    : stopped_(true),
+      mode_(mode) {
+    times_.clear();
+  }
+
+  // Start counting. If the stopwatch is already counting, then resets the
+  // start point at the current time.
+  void start() {
+    stopped_ = false;
+    GetTimes(&times_);
+  }
+
+  // Stop counting. If the stopwatch is already stopped, has no effect.
+  void stop() {
+    if (stopped_) return;
+    stopped_ = true;
+
+    CpuTimes current;
+    GetTimes(&current);
+    times_.wall = current.wall - times_.wall;
+    times_.user = current.user - times_.user;
+    times_.system = current.system - times_.system;
+    times_.context_switches = current.context_switches - times_.context_switches;
+  }
+
+  // Return the elapsed amount of time. If the stopwatch is running, then returns
+  // the amount of time since it was started. If it is stopped, returns the amount
+  // of time between the most recent start/stop pair. If the stopwatch has never been
+  // started, the elapsed time is considered to be zero.
+  CpuTimes elapsed() const {
+    if (stopped_) return times_;
+
+    CpuTimes current;
+    GetTimes(&current);
+    current.wall -= times_.wall;
+    current.user -= times_.user;
+    current.system -= times_.system;
+    current.context_switches -= times_.context_switches;
+    return current;
+  }
+
+  // Resume a stopped stopwatch, such that the elapsed time continues to grow from
+  // the point where it was last stopped.
+  // For example:
+  //   Stopwatch s;
+  //   s.start();
+  //   sleep(1); // elapsed() is now ~1sec
+  //   s.stop();
+  //   sleep(1);
+  //   s.resume();
+  //   sleep(1); // elapsed() is now ~2sec
+  void resume() {
+    if (!stopped_) return;
+
+    CpuTimes current(times_);
+    start();
+    times_.wall   -= current.wall;
+    times_.user   -= current.user;
+    times_.system -= current.system;
+    times_.context_switches -= current.context_switches;
+  }
+
+  bool is_stopped() const {
+    return stopped_;
+  }
+
+ private:
+  void GetTimes(CpuTimes *times) const {
+    struct rusage usage;
+    struct timespec wall;
+
+#if defined(__APPLE__)
+    if (mode_ == THIS_THREAD) {
+      //Adapted from http://blog.kuriositaet.de/?p=257.
+      struct task_basic_info t_info;
+      mach_msg_type_number_t t_info_count = TASK_BASIC_INFO_COUNT;
+      CHECK_EQ(KERN_SUCCESS, task_info(mach_task_self(), TASK_THREAD_TIMES_INFO,
+                                       (task_info_t)&t_info, &t_info_count));
+      usage.ru_utime.tv_sec = t_info.user_time.seconds;
+      usage.ru_utime.tv_usec = t_info.user_time.microseconds;
+      usage.ru_stime.tv_sec = t_info.system_time.seconds;
+      usage.ru_stime.tv_usec = t_info.system_time.microseconds;
+    } else {
+      CHECK_EQ(0, getrusage(RUSAGE_SELF, &usage));
+    }
+
+    mach_timespec_t ts;
+    walltime_internal::GetCurrentTime(&ts);
+    wall.tv_sec = ts.tv_sec;
+    wall.tv_nsec = ts.tv_nsec;
+#else
+    CHECK_EQ(0, getrusage((mode_ == THIS_THREAD) ? RUSAGE_THREAD : RUSAGE_SELF, &usage));
+    CHECK_EQ(0, clock_gettime(CLOCK_MONOTONIC, &wall));
+#endif  // defined(__APPLE__)
+    times->wall   = wall.tv_sec * 1000000000L + wall.tv_nsec;
+    times->user   = usage.ru_utime.tv_sec * 1000000000L + usage.ru_utime.tv_usec * 1000L;
+    times->system = usage.ru_stime.tv_sec * 1000000000L + usage.ru_stime.tv_usec * 1000L;
+    times->context_switches = usage.ru_nvcsw + usage.ru_nivcsw;
+  }
+
+  bool stopped_;
+
+  CpuTimes times_;
+  Mode mode_;
+};
+
+
+namespace sw_internal {
+
+// Internal class used by the LOG_TIMING macro.
+class LogTiming {
+ public:
+  LogTiming(const char *file, int line, google::LogSeverity severity,
+            std::string prefix, std::string description,
+            int64_t max_expected_millis, bool should_print)
+      : file_(file),
+        line_(line),
+        severity_(severity),
+        prefix_(std::move(prefix)),
+        description_(std::move(description)),
+        max_expected_millis_(max_expected_millis),
+        should_print_(should_print),
+        has_run_(false) {
+    stopwatch_.start();
+  }
+
+  ~LogTiming() {
+    if (should_print_) {
+      Print(max_expected_millis_);
+    }
+  }
+
+  // Allows this object to be used as the loop variable in for-loop macros.
+  // Call HasRun() in the conditional check in the for-loop.
+  bool HasRun() {
+    return has_run_;
+  }
+
+  // Allows this object to be used as the loop variable in for-loop macros.
+  // Call MarkHasRun() in the "increment" section of the for-loop.
+  void MarkHasRun() {
+    has_run_ = true;
+  }
+
+ private:
+  Stopwatch stopwatch_;
+  const char *file_;
+  const int line_;
+  const google::LogSeverity severity_;
+  const string prefix_;
+  const std::string description_;
+  const int64_t max_expected_millis_;
+  const bool should_print_;
+  bool has_run_;
+
+  // Print if the number of expected millis exceeds the max.
+  // Passing a negative number implies "always print".
+  void Print(int64_t max_expected_millis) {
+    stopwatch_.stop();
+    CpuTimes times = stopwatch_.elapsed();
+    if (times.wall_millis() > max_expected_millis) {
+      google::LogMessage(file_, line_, severity_).stream()
+        << prefix_ << "Time spent " << description_ << ": "
+        << times.ToString();
+    }
+  }
+
+};
+
+} // namespace sw_internal
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/string_case-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case-test.cc b/be/src/kudu/util/string_case-test.cc
new file mode 100644
index 0000000..ae166f5
--- /dev/null
+++ b/be/src/kudu/util/string_case-test.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/string_case.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(TestStringCase, TestSnakeToCamel) {
+  string out;
+  SnakeToCamelCase("foo_bar", &out);
+  ASSERT_EQ("FooBar", out);
+
+
+  SnakeToCamelCase("foo-bar", &out);
+  ASSERT_EQ("FooBar", out);
+
+  SnakeToCamelCase("foobar", &out);
+  ASSERT_EQ("Foobar", out);
+}
+
+TEST(TestStringCase, TestToUpperCase) {
+  string out;
+  ToUpperCase(string("foo"), &out);
+  ASSERT_EQ("FOO", out);
+  ToUpperCase(string("foo bar-BaZ"), &out);
+  ASSERT_EQ("FOO BAR-BAZ", out);
+}
+
+TEST(TestStringCase, TestToUpperCaseInPlace) {
+  string in_out = "foo";
+  ToUpperCase(in_out, &in_out);
+  ASSERT_EQ("FOO", in_out);
+}
+
+TEST(TestStringCase, TestCapitalize) {
+  string word = "foo";
+  Capitalize(&word);
+  ASSERT_EQ("Foo", word);
+
+  word = "HiBerNATe";
+  Capitalize(&word);
+  ASSERT_EQ("Hibernate", word);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/string_case.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case.cc b/be/src/kudu/util/string_case.cc
new file mode 100644
index 0000000..141cdc5
--- /dev/null
+++ b/be/src/kudu/util/string_case.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/string_case.h"
+
+#include <glog/logging.h>
+#include <ctype.h>
+
+namespace kudu {
+
+using std::string;
+
+void SnakeToCamelCase(const std::string &snake_case,
+                      std::string *camel_case) {
+  DCHECK_NE(camel_case, &snake_case) << "Does not support in-place operation";
+  camel_case->clear();
+  camel_case->reserve(snake_case.size());
+
+  bool uppercase_next = true;
+  for (char c : snake_case) {
+    if ((c == '_') ||
+        (c == '-')) {
+      uppercase_next = true;
+      continue;
+    }
+    if (uppercase_next) {
+      camel_case->push_back(toupper(c));
+    } else {
+      camel_case->push_back(c);
+    }
+    uppercase_next = false;
+  }
+}
+
+void ToUpperCase(const std::string &string,
+                 std::string *out) {
+  if (out != &string) {
+    *out = string;
+  }
+
+  for (char& c : *out) {
+    c = toupper(c);
+  }
+}
+
+void Capitalize(string *word) {
+  uint32_t size = word->size();
+  if (size == 0) {
+    return;
+  }
+
+  (*word)[0] = toupper((*word)[0]);
+
+  for (int i = 1; i < size; i++) {
+    (*word)[i] = tolower((*word)[i]);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/string_case.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/string_case.h b/be/src/kudu/util/string_case.h
new file mode 100644
index 0000000..98f5828
--- /dev/null
+++ b/be/src/kudu/util/string_case.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility methods for dealing with string case.
+#ifndef KUDU_UTIL_STRING_CASE_H
+#define KUDU_UTIL_STRING_CASE_H
+
+#include <string>
+
+namespace kudu {
+
+// Convert the given snake_case string to camel case.
+// Also treats '-' in a string like a '_'
+// For example:
+// - 'foo_bar' -> FooBar
+// - 'foo-bar' -> FooBar
+//
+// This function cannot operate in-place -- i.e. 'camel_case' must not
+// point to 'snake_case'.
+void SnakeToCamelCase(const std::string &snake_case,
+                      std::string *camel_case);
+
+// Upper-case all of the characters in the given string.
+// 'string' and 'out' may refer to the same string to replace in-place.
+void ToUpperCase(const std::string &string,
+                 std::string *out);
+
+// Capitalizes a string containing a word in place.
+// For example:
+// - 'hiBerNATe' -> 'Hibernate'
+void Capitalize(std::string *word);
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/striped64-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64-test.cc b/be/src/kudu/util/striped64-test.cc
new file mode 100644
index 0000000..fee07ca
--- /dev/null
+++ b/be/src/kudu/util/striped64-test.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/striped64.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+// These flags are used by the multi-threaded tests, can be used for microbenchmarking.
+DEFINE_int32(num_operations, 10*1000, "Number of operations to perform");
+DEFINE_int32(num_threads, 2, "Number of worker threads");
+
+namespace kudu {
+
+// Test some basic operations
+TEST(Striped64Test, TestBasic) {
+  LongAdder adder;
+  ASSERT_EQ(adder.Value(), 0);
+  adder.IncrementBy(100);
+  ASSERT_EQ(adder.Value(), 100);
+  adder.Increment();
+  ASSERT_EQ(adder.Value(), 101);
+  adder.Decrement();
+  ASSERT_EQ(adder.Value(), 100);
+  adder.IncrementBy(-200);
+  ASSERT_EQ(adder.Value(), -100);
+  adder.Reset();
+  ASSERT_EQ(adder.Value(), 0);
+}
+
+template <class Adder>
+class MultiThreadTest {
+ public:
+  typedef std::vector<scoped_refptr<Thread> > thread_vec_t;
+
+  MultiThreadTest(int64_t num_operations, int64_t num_threads)
+   :  num_operations_(num_operations),
+      num_threads_(num_threads) {
+  }
+
+  void IncrementerThread(const int64_t num) {
+    for (int i = 0; i < num; i++) {
+      adder_.Increment();
+    }
+  }
+
+  void DecrementerThread(const int64_t num) {
+    for (int i = 0; i < num; i++) {
+      adder_.Decrement();
+    }
+  }
+
+  void Run() {
+    // Increment
+    for (int i = 0; i < num_threads_; i++) {
+      scoped_refptr<Thread> ref;
+      Thread::Create("Striped64", "Incrementer", &MultiThreadTest::IncrementerThread, this,
+                     num_operations_, &ref);
+      threads_.push_back(ref);
+    }
+    for (const scoped_refptr<Thread> &t : threads_) {
+      t->Join();
+    }
+    ASSERT_EQ(num_threads_*num_operations_, adder_.Value());
+    threads_.clear();
+
+    // Decrement back to zero
+    for (int i = 0; i < num_threads_; i++) {
+      scoped_refptr<Thread> ref;
+      Thread::Create("Striped64", "Decrementer", &MultiThreadTest::DecrementerThread, this,
+                     num_operations_, &ref);
+      threads_.push_back(ref);
+    }
+    for (const scoped_refptr<Thread> &t : threads_) {
+      t->Join();
+    }
+    ASSERT_EQ(0, adder_.Value());
+  }
+
+  Adder adder_;
+
+  int64_t num_operations_;
+  // This is rounded down to the nearest even number
+  int32_t num_threads_;
+  thread_vec_t threads_;
+};
+
+// Test adder implemented by a single AtomicInt for comparison
+class BasicAdder {
+ public:
+  BasicAdder() : value_(0) {}
+  void IncrementBy(int64_t x) { value_.IncrementBy(x); }
+  inline void Increment() { IncrementBy(1); }
+  inline void Decrement() { IncrementBy(-1); }
+  int64_t Value() { return value_.Load(); }
+ private:
+  AtomicInt<int64_t> value_;
+};
+
+void RunMultiTest(int64_t num_operations, int64_t num_threads) {
+  MonoTime start = MonoTime::Now();
+  MultiThreadTest<BasicAdder> basicTest(num_operations, num_threads);
+  basicTest.Run();
+  MonoTime end1 = MonoTime::Now();
+  MultiThreadTest<LongAdder> test(num_operations, num_threads);
+  test.Run();
+  MonoTime end2 = MonoTime::Now();
+  MonoDelta basic = end1 - start;
+  MonoDelta striped = end2 - end1;
+  LOG(INFO) << "Basic counter took   " << basic.ToMilliseconds() << "ms.";
+  LOG(INFO) << "Striped counter took " << striped.ToMilliseconds() << "ms.";
+}
+
+// Compare a single-thread workload. Demonstrates the overhead of LongAdder over AtomicInt.
+TEST(Striped64Test, TestSingleIncrDecr) {
+  OverrideFlagForSlowTests(
+      "num_operations",
+      strings::Substitute("$0", (FLAGS_num_operations * 100)));
+  RunMultiTest(FLAGS_num_operations, 1);
+}
+
+// Compare a multi-threaded workload. LongAdder should show improvements here.
+TEST(Striped64Test, TestMultiIncrDecr) {
+  OverrideFlagForSlowTests(
+      "num_operations",
+      strings::Substitute("$0", (FLAGS_num_operations * 100)));
+  OverrideFlagForSlowTests(
+      "num_threads",
+      strings::Substitute("$0", (FLAGS_num_threads * 4)));
+  RunMultiTest(FLAGS_num_operations, FLAGS_num_threads);
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/striped64.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64.cc b/be/src/kudu/util/striped64.cc
new file mode 100644
index 0000000..8343177
--- /dev/null
+++ b/be/src/kudu/util/striped64.cc
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/striped64.h"
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/threadlocal.h"
+
+using kudu::striped64::internal::Cell;
+
+namespace kudu {
+
+namespace striped64 {
+namespace internal {
+
+//
+// Cell
+//
+
+Cell::Cell()
+    : value_(0) {
+}
+} // namespace internal
+} // namespace striped64
+
+//
+// Striped64
+//
+__thread uint64_t Striped64::tls_hashcode_ = 0;
+const uint32_t Striped64::kNumCpus = sysconf(_SC_NPROCESSORS_ONLN);
+
+Striped64::Striped64()
+    : busy_(false),
+      cell_buffer_(nullptr),
+      cells_(nullptr),
+      num_cells_(0) {
+}
+
+uint64_t Striped64::get_tls_hashcode() {
+  if (PREDICT_FALSE(tls_hashcode_ == 0)) {
+    Random r((MonoTime::Now() - MonoTime::Min()).ToNanoseconds());
+    const uint64_t hash = r.Next64();
+    // Avoid zero to allow xorShift rehash, and because 0 indicates an unset
+    // hashcode above.
+    tls_hashcode_ = (hash == 0) ? 1 : hash;
+  }
+  return tls_hashcode_;
+}
+
+
+Striped64::~Striped64() {
+  // Cell is a POD, so no need to destruct each one.
+  free(cell_buffer_);
+}
+
+void Striped64::RetryUpdate(int64_t x, Rehash to_rehash) {
+  uint64_t h = get_tls_hashcode();
+  // There are three operations in this loop.
+  //
+  // 1. Try to add to the Cell hash table entry for the thread if the table exists.
+  //    When there's contention, rehash to try a different Cell.
+  // 2. Try to initialize the hash table.
+  // 3. Try to update the base counter.
+  //
+  // These are predicated on successful CAS operations, which is why it's all wrapped in an
+  // infinite retry loop.
+  while (true) {
+    int32_t n = base::subtle::Acquire_Load(&num_cells_);
+    if (n > 0) {
+      if (to_rehash == kRehash) {
+        // CAS failed already, rehash before trying to increment.
+        to_rehash = kNoRehash;
+      } else {
+        Cell *cell = &(cells_[(n - 1) & h]);
+        int64_t v = cell->value_.Load();
+        if (cell->CompareAndSet(v, Fn(v, x))) {
+          // Successfully CAS'd the corresponding cell, done.
+          break;
+        }
+      }
+      // Rehash since we failed to CAS, either previously or just now.
+      h ^= h << 13;
+      h ^= h >> 17;
+      h ^= h << 5;
+    } else if (n == 0 && CasBusy()) {
+      // We think table hasn't been initialized yet, try to do so.
+      // Recheck preconditions, someone else might have init'd in the meantime.
+      n = base::subtle::Acquire_Load(&num_cells_);
+      if (n == 0) {
+        n = 1;
+        // Calculate the size. Nearest power of two >= NCPU.
+        // Also handle a negative NCPU, can happen if sysconf name is unknown
+        while (kNumCpus > n) {
+          n <<= 1;
+        }
+        // Allocate cache-aligned memory for use by the cells_ table.
+        int err = posix_memalign(&cell_buffer_, CACHELINE_SIZE, sizeof(Cell)*n);
+        CHECK_EQ(0, err) << "error calling posix_memalign" << std::endl;
+        // Initialize the table
+        cells_ = new (cell_buffer_) Cell[n];
+        base::subtle::Release_Store(&num_cells_, n);
+      }
+      // End critical section
+      busy_.Store(0);
+    } else {
+      // Fallback to adding to the base value.
+      // Means the table wasn't initialized or we failed to init it.
+      int64_t v = base_.value_.Load();
+      if (CasBase(v, Fn(v, x))) {
+        break;
+      }
+    }
+  }
+  // Record index for next time
+  tls_hashcode_ = h;
+}
+
+void Striped64::InternalReset(int64_t initialValue) {
+  const int32_t n = base::subtle::Acquire_Load(&num_cells_);
+  base_.value_.Store(initialValue);
+  for (int i = 0; i < n; i++) {
+    cells_[i].value_.Store(initialValue);
+  }
+}
+
+void LongAdder::IncrementBy(int64_t x) {
+  // Use hash table if present. If that fails, call RetryUpdate to rehash and retry.
+  // If no hash table, try to CAS the base counter. If that fails, RetryUpdate to init the table.
+  const int32_t n = base::subtle::Acquire_Load(&num_cells_);
+  if (n > 0) {
+    Cell *cell = &(cells_[(n - 1) & get_tls_hashcode()]);
+    DCHECK_EQ(0, reinterpret_cast<const uintptr_t>(cell) & (sizeof(Cell) - 1))
+        << " unaligned Cell not allowed for Striped64" << std::endl;
+    const int64_t old = cell->value_.Load();
+    if (!cell->CompareAndSet(old, old + x)) {
+      // When we hit a hash table contention, signal RetryUpdate to rehash.
+      RetryUpdate(x, kRehash);
+    }
+  } else {
+    int64_t b = base_.value_.Load();
+    if (!base_.CompareAndSet(b, b + x)) {
+      // Attempt to initialize the table. No need to rehash since the contention was for the
+      // base counter, not the hash table.
+      RetryUpdate(x, kNoRehash);
+    }
+  }
+}
+
+//
+// LongAdder
+//
+
+int64_t LongAdder::Value() const {
+  int64_t sum = base_.value_.Load();
+  const int32_t n = base::subtle::Acquire_Load(&num_cells_);
+  for (int i = 0; i < n; i++) {
+    sum += cells_[i].value_.Load();
+  }
+  return sum;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/striped64.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/striped64.h b/be/src/kudu/util/striped64.h
new file mode 100644
index 0000000..a3b829b
--- /dev/null
+++ b/be/src/kudu/util/striped64.h
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_STRIPED64_H_
+#define KUDU_UTIL_STRIPED64_H_
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/threadlocal.h"
+
+namespace kudu {
+
+class Striped64;
+
+namespace striped64 {
+namespace internal {
+
+#define ATOMIC_INT_SIZE sizeof(AtomicInt<int64_t>)
+// Padded POD container for AtomicInt. This prevents false sharing of cache lines.
+class Cell {
+ public:
+  Cell();
+  inline bool CompareAndSet(int64_t cmp, int64_t value) {
+    return value_.CompareAndSet(cmp, value);
+  }
+
+  // Padding advice from Herb Sutter:
+  // http://www.drdobbs.com/parallel/eliminate-false-sharing/217500206?pgno=4
+  AtomicInt<int64_t> value_;
+  char pad[CACHELINE_SIZE > ATOMIC_INT_SIZE ?
+           CACHELINE_SIZE - ATOMIC_INT_SIZE : 1];
+
+  DISALLOW_COPY_AND_ASSIGN(Cell);
+} CACHELINE_ALIGNED;
+#undef ATOMIC_INT_SIZE
+
+} // namespace internal
+} // namespace striped64
+
+// This set of classes is heavily derived from JSR166e, released into the public domain
+// by Doug Lea and the other authors.
+//
+// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?view=co
+// See: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?view=co
+//
+// The Striped64 and LongAdder implementations here are simplified versions of what's present in
+// JSR166e. However, the core ideas remain the same.
+//
+// Updating a single AtomicInteger in a multi-threaded environment can be quite slow:
+//
+//   1. False sharing of cache lines with other counters.
+//   2. Cache line bouncing from high update rates, especially with many cores.
+//
+// These two problems are addressed by Striped64. When there is no contention, it uses CAS on a
+// single base counter to store updates. However, when Striped64 detects contention
+// (via a failed CAS operation), it will allocate a small, fixed size hashtable of Cells.
+// A Cell is a simple POD that pads out an AtomicInt to 64 bytes to prevent
+// sharing a cache line.
+//
+// Reading the value of a Striped64 requires traversing the hashtable to calculate the true sum.
+//
+// Each updating thread uses a thread-local hashcode to determine its Cell in the hashtable.
+// If a thread fails to CAS its hashed Cell, it will do a lightweight rehash operation to try
+// and find an uncontended bucket. Because the hashcode is thread-local, this rehash affects all
+// Striped64's accessed by the thread. This is good, since contention on one Striped64 is
+// indicative of contention elsewhere too.
+//
+// The hashtable is statically sized to the nearest power of 2 greater than or equal to the
+// number of CPUs. This is sufficient, since this guarantees the existence of a perfect hash
+// function. Due to the random rehashing, the threads should eventually converge to this function.
+// In practice, this scheme has shown to be sufficient.
+//
+// The biggest simplification of this implementation compared to JSR166e is that we do not
+// dynamically grow the table, instead immediately allocating it to the full size.
+// We also do not lazily allocate each Cell, instead allocating the entire array at once.
+// This means we waste some additional memory in low contention scenarios, and initial allocation
+// will also be slower. Some of the micro-optimizations were also elided for readability.
+class Striped64 {
+ public:
+  Striped64();
+  virtual ~Striped64();
+
+ protected:
+
+  enum Rehash {
+    kRehash,
+    kNoRehash
+  };
+
+  // CAS the base field.
+  bool CasBase(int64_t cmp, int64_t val) { return base_.CompareAndSet(cmp, val); }
+
+  // CAS the busy field from 0 to 1 to acquire the lock.
+  bool CasBusy() { return busy_.CompareAndSet(0, 1); }
+
+  // Computes the function of the current and new value. Used in RetryUpdate.
+  virtual int64_t Fn(int64_t current_value, int64_t new_value) = 0;
+
+  // Handles cases of updates involving initialization, resizing, creating new Cells, and/or
+  // contention. See above for further explanation.
+  void RetryUpdate(int64_t x, Rehash to_rehash);
+
+  // Sets base and all cells to the given value.
+  void InternalReset(int64_t initialValue);
+
+  // Base value, used mainly when there is no contention, but also as a fallback during
+  // table initialization races. Updated via CAS.
+  striped64::internal::Cell base_;
+
+  // CAS lock used when resizing and/or creating cells.
+  AtomicBool busy_;
+
+  // Backing buffer for cells_, used for alignment.
+  void* cell_buffer_;
+
+  // Table of cells. When non-null, size is the nearest power of 2 >= NCPU.
+  striped64::internal::Cell* cells_;
+  int32_t num_cells_;
+
+ protected:
+  static uint64_t get_tls_hashcode();
+
+ private:
+  // Static hash code per-thread. Shared across all instances to limit thread-local pollution.
+  // Also, if a thread hits a collision on one Striped64, it's also likely to collide on
+  // other Striped64s too.
+  static __thread uint64_t tls_hashcode_;
+
+  // Number of CPUs, to place bound on table size.
+  static const uint32_t kNumCpus;
+
+};
+
+// A 64-bit number optimized for high-volume concurrent updates.
+// See Striped64 for a longer explanation of the inner workings.
+class LongAdder : Striped64 {
+ public:
+  LongAdder() {}
+  void IncrementBy(int64_t x);
+  void Increment() { IncrementBy(1); }
+  void Decrement() { IncrementBy(-1); }
+
+  // Returns the current value.
+  // Note this is not an atomic snapshot in the presence of concurrent updates.
+  int64_t Value() const;
+
+  // Resets the counter state to zero.
+  void Reset() { InternalReset(0); }
+
+ private:
+  int64_t Fn(int64_t current_value, int64_t new_value) override {
+    return current_value + new_value;
+  }
+
+  DISALLOW_COPY_AND_ASSIGN(LongAdder);
+};
+
+} // namespace kudu
+
+#endif


Mime
View raw message