impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [07/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8
Date Sat, 17 Jun 2017 07:25:33 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/protoc-gen-insertions.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protoc-gen-insertions.cc b/be/src/kudu/util/protoc-gen-insertions.cc
new file mode 100644
index 0000000..d8769aa
--- /dev/null
+++ b/be/src/kudu/util/protoc-gen-insertions.cc
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple protoc plugin which inserts some code at the top of each generated protobuf.
+// Currently, this just adds an include of protobuf-annotations.h, a file which hooks up
+// the protobuf concurrency annotations to our TSAN annotations.
+#include <glog/logging.h>
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using google::protobuf::io::ZeroCopyOutputStream;
+using google::protobuf::io::Printer;
+
+namespace kudu {
+
+static const char* const kIncludeToInsert = "#include \"kudu/util/protobuf-annotations.h\"\n";
+static const char* const kProtoExtension = ".proto";
+
+class InsertAnnotations : public ::google::protobuf::compiler::CodeGenerator {
+  virtual bool Generate(const google::protobuf::FileDescriptor *file,
+                        const std::string &/*param*/,
+                        google::protobuf::compiler::GeneratorContext *gen_context,
+                        std::string *error) const OVERRIDE {
+
+    // Determine the file name we will substitute into.
+    string path_no_extension;
+    if (!TryStripSuffixString(file->name(), kProtoExtension, &path_no_extension)) {
+      *error = strings::Substitute("file name $0 did not end in $1", file->name(), kProtoExtension);
+      return false;
+    }
+    string pb_file = path_no_extension + ".pb.cc";
+
+    // Actually insert the new #include
+    gscoped_ptr<ZeroCopyOutputStream> inserter(gen_context->OpenForInsert(pb_file, "includes"));
+    Printer printer(inserter.get(), '$');
+    printer.Print(kIncludeToInsert);
+
+    if (printer.failed()) {
+      *error = "Failed to print to output file";
+      return false;
+    }
+
+    return true;
+  }
+};
+
+} // namespace kudu
+
+int main(int argc, char *argv[]) {
+  kudu::InsertAnnotations generator;
+  return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pstack_watcher-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher-test.cc b/be/src/kudu/util/pstack_watcher-test.cc
new file mode 100644
index 0000000..652fec2
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher-test.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/pstack_watcher.h"
+
+#include <gtest/gtest.h>
+#include <memory>
+#include <poll.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/test_macros.h"
+
+using std::shared_ptr;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+TEST(TestPstackWatcher, TestPstackWatcherCancellation) {
+  PstackWatcher watcher(MonoDelta::FromSeconds(1000000));
+  watcher.Shutdown();
+}
+
+TEST(TestPstackWatcher, TestWait) {
+  PstackWatcher watcher(MonoDelta::FromMilliseconds(10));
+  watcher.Wait();
+}
+
+TEST(TestPstackWatcher, TestDumpStacks) {
+  ASSERT_OK(PstackWatcher::DumpStacks());
+}
+
+static shared_ptr<FILE> RedirectStdout(string *temp_path) {
+  string temp_dir;
+  CHECK_OK(Env::Default()->GetTestDirectory(&temp_dir));
+  *temp_path = Substitute("$0/pstack_watcher-dump.$1.txt",
+                      temp_dir, getpid());
+  return shared_ptr<FILE>(
+      freopen(temp_path->c_str(), "w", stdout), fclose);
+}
+
+TEST(TestPstackWatcher, TestPstackWatcherRunning) {
+  string stdout_file;
+  int old_stdout;
+  CHECK_ERR(old_stdout = dup(STDOUT_FILENO));
+  {
+    shared_ptr<FILE> out_fp = RedirectStdout(&stdout_file);
+    PCHECK(out_fp.get());
+    PstackWatcher watcher(MonoDelta::FromMilliseconds(500));
+    while (watcher.IsRunning()) {
+      SleepFor(MonoDelta::FromMilliseconds(1));
+    }
+  }
+  CHECK_ERR(dup2(old_stdout, STDOUT_FILENO));
+  PCHECK(stdout = fdopen(STDOUT_FILENO, "w"));
+
+  faststring contents;
+  CHECK_OK(ReadFileToString(Env::Default(), stdout_file, &contents));
+  ASSERT_STR_CONTAINS(contents.ToString(), "BEGIN STACKS");
+  CHECK_ERR(unlink(stdout_file.c_str()));
+  ASSERT_GE(fprintf(stdout, "%s\n", contents.ToString().c_str()), 0)
+      << "errno=" << errno << ": " << ErrnoToString(errno);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pstack_watcher.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher.cc b/be/src/kudu/util/pstack_watcher.cc
new file mode 100644
index 0000000..4ba7ada
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher.cc
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/pstack_watcher.h"
+
+#include <memory>
+#include <stdio.h>
+#include <string>
+#include <sys/types.h>
+#include <unistd.h>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+
+namespace kudu {
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+PstackWatcher::PstackWatcher(MonoDelta timeout)
+    : timeout_(std::move(timeout)), running_(true), cond_(&lock_) {
+  CHECK_OK(Thread::Create("pstack_watcher", "pstack_watcher",
+                 boost::bind(&PstackWatcher::Run, this), &thread_));
+}
+
+PstackWatcher::~PstackWatcher() {
+  Shutdown();
+}
+
+void PstackWatcher::Shutdown() {
+  {
+    MutexLock guard(lock_);
+    running_ = false;
+    cond_.Broadcast();
+  }
+  if (thread_) {
+    CHECK_OK(ThreadJoiner(thread_.get()).Join());
+    thread_.reset();
+  }
+}
+
+bool PstackWatcher::IsRunning() const {
+  MutexLock guard(lock_);
+  return running_;
+}
+
+void PstackWatcher::Wait() const {
+  MutexLock lock(lock_);
+  while (running_) {
+    cond_.Wait();
+  }
+}
+
+void PstackWatcher::Run() {
+  MutexLock guard(lock_);
+  if (!running_) return;
+  cond_.TimedWait(timeout_);
+  if (!running_) return;
+
+  WARN_NOT_OK(DumpStacks(DUMP_FULL), "Unable to print pstack from watcher");
+  running_ = false;
+  cond_.Broadcast();
+}
+
+Status PstackWatcher::HasProgram(const char* progname) {
+  Subprocess proc({ "which", progname } );
+  proc.DisableStderr();
+  proc.DisableStdout();
+  RETURN_NOT_OK_PREPEND(proc.Start(),
+      Substitute("HasProgram($0): error running 'which'", progname));
+  RETURN_NOT_OK(proc.Wait());
+  int exit_status;
+  string exit_info;
+  RETURN_NOT_OK(proc.GetExitStatus(&exit_status, &exit_info));
+  if (exit_status == 0) {
+    return Status::OK();
+  }
+  return Status::NotFound(Substitute("can't find $0: $1", progname, exit_info));
+}
+
+Status PstackWatcher::DumpStacks(int flags) {
+  return DumpPidStacks(getpid(), flags);
+}
+
+Status PstackWatcher::DumpPidStacks(pid_t pid, int flags) {
+
+  // Prefer GDB if available; it gives us line numbers and thread names.
+  if (HasProgram("gdb").ok()) {
+    return RunGdbStackDump(pid, flags);
+  }
+
+  // Otherwise, try to use pstack or gstack.
+  const char *progname = nullptr;
+  if (HasProgram("pstack").ok()) {
+    progname = "pstack";
+  } else if (HasProgram("gstack").ok()) {
+    progname = "gstack";
+  }
+
+  if (!progname) {
+    return Status::ServiceUnavailable("Neither gdb, pstack, nor gstack appears to be installed.");
+  }
+  return RunPstack(progname, pid);
+}
+
+Status PstackWatcher::RunGdbStackDump(pid_t pid, int flags) {
+  // Command: gdb -quiet -batch -nx -ex cmd1 -ex cmd2 /proc/$PID/exe $PID
+  vector<string> argv;
+  argv.push_back("gdb");
+  // Don't print introductory version/copyright messages.
+  argv.push_back("-quiet");
+  // Exit after processing all of the commands below.
+  argv.push_back("-batch");
+  // Don't run commands from .gdbinit
+  argv.push_back("-nx");
+  // On RHEL6 and older Ubuntu, we occasionally would see gdb spin forever
+  // trying to collect backtraces. Setting a backtrace limit is a reasonable
+  // workaround, since we don't really expect >100-deep stacks anyway.
+  //
+  // See https://bugs.launchpad.net/ubuntu/+source/gdb/+bug/434168
+  argv.push_back("-ex");
+  argv.push_back("set backtrace limit 100");
+  argv.push_back("-ex");
+  argv.push_back("set print pretty on");
+  argv.push_back("-ex");
+  argv.push_back("info threads");
+  argv.push_back("-ex");
+  argv.push_back("thread apply all bt");
+  if (flags & DUMP_FULL) {
+    argv.push_back("-ex");
+    argv.push_back("thread apply all bt full");
+  }
+  string executable;
+  Env* env = Env::Default();
+  RETURN_NOT_OK(env->GetExecutablePath(&executable));
+  argv.push_back(executable);
+  argv.push_back(Substitute("$0", pid));
+  return RunStackDump(argv);
+}
+
+Status PstackWatcher::RunPstack(const std::string& progname, pid_t pid) {
+  string pid_string(Substitute("$0", pid));
+  vector<string> argv;
+  argv.push_back(progname);
+  argv.push_back(pid_string);
+  return RunStackDump(argv);
+}
+
+Status PstackWatcher::RunStackDump(const vector<string>& argv) {
+  printf("************************ BEGIN STACKS **************************\n");
+  if (fflush(stdout) == EOF) {
+    return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno);
+  }
+  Subprocess pstack_proc(argv);
+  RETURN_NOT_OK_PREPEND(pstack_proc.Start(), "RunStackDump proc.Start() failed");
+  if (::close(pstack_proc.ReleaseChildStdinFd()) == -1) {
+    return Status::IOError("Unable to close child stdin", ErrnoToString(errno), errno);
+  }
+  RETURN_NOT_OK_PREPEND(pstack_proc.Wait(), "RunStackDump proc.Wait() failed");
+  int exit_code;
+  string exit_info;
+  RETURN_NOT_OK_PREPEND(pstack_proc.GetExitStatus(&exit_code, &exit_info),
+                        "RunStackDump proc.GetExitStatus() failed");
+  if (exit_code != 0) {
+    return Status::RuntimeError("RunStackDump proc.Wait() error", exit_info);
+  }
+  printf("************************* END STACKS ***************************\n");
+  if (fflush(stdout) == EOF) {
+    return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno);
+  }
+
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pstack_watcher.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher.h b/be/src/kudu/util/pstack_watcher.h
new file mode 100644
index 0000000..396bf94
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PSTACK_WATCHER_H
+#define KUDU_UTIL_PSTACK_WATCHER_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+// PstackWatcher is an object which will pstack the current process and print
+// the results to stdout.  It does this after a certain timeout has occured.
+class PstackWatcher {
+ public:
+
+  enum Flags {
+    NO_FLAGS = 0,
+
+    // Run 'thread apply all bt full', which is very verbose output
+    DUMP_FULL = 1
+  };
+
+  // Static method to collect and write stack dump output to stdout of the current
+  // process.
+  static Status DumpStacks(int flags = NO_FLAGS);
+
+  // Like the above but for any process, not just the current one.
+  static Status DumpPidStacks(pid_t pid, int flags = NO_FLAGS);
+
+  // Instantiate a watcher that writes a pstack to stdout after the given
+  // timeout expires.
+  explicit PstackWatcher(MonoDelta timeout);
+
+  ~PstackWatcher();
+
+  // Shut down the watcher and do not log a pstack.
+  // This method is not thread-safe.
+  void Shutdown();
+
+  // Test whether the watcher is still running or has shut down.
+  // Thread-safe.
+  bool IsRunning() const;
+
+  // Wait until the timeout expires and the watcher logs a pstack.
+  // Thread-safe.
+  void Wait() const;
+
+ private:
+  // Test for the existence of the given program in the system path.
+  static Status HasProgram(const char* progname);
+
+  // Get a stack dump using GDB directly.
+  static Status RunGdbStackDump(pid_t pid, int flags);
+
+  // Get a stack dump using the pstack or gstack program.
+  static Status RunPstack(const std::string& progname, pid_t pid);
+
+  // Invoke and wait for the stack dump program.
+  static Status RunStackDump(const std::vector<std::string>& argv);
+
+  // Run the thread that waits for the specified duration before logging a
+  // pstack.
+  void Run();
+
+  const MonoDelta timeout_;
+  bool running_;
+  scoped_refptr<Thread> thread_;
+  mutable Mutex lock_;
+  mutable ConditionVariable cond_;
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random-test.cc b/be/src/kudu/util/random-test.cc
new file mode 100644
index 0000000..b40e90c
--- /dev/null
+++ b/be/src/kudu/util/random-test.cc
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <limits>
+#include <unordered_set>
+
+#include <glog/stl_logging.h>
+
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class RandomTest : public KuduTest {
+ public:
+  RandomTest()
+      : rng_(SeedRandom()) {
+  }
+
+ protected:
+  Random rng_;
+};
+
+// Tests that after a certain number of invocations of Normal(), the
+// actual mean of all samples is within the specified standard
+// deviation of the target mean.
+TEST_F(RandomTest, TestNormalDist) {
+  const double kMean = 5.0;
+  const double kStdDev = 0.01;
+  const int kNumIters = 100000;
+
+  double sum = 0.0;
+  for (int i = 0; i < kNumIters; ++i) {
+    sum += rng_.Normal(kMean, kStdDev);
+  }
+
+  ASSERT_LE(fabs((sum / static_cast<double>(kNumIters)) - kMean), kStdDev);
+}
+
+// Tests that after a large number of invocations of Next32() and Next64(), we
+// have flipped all the bits we claim we should have.
+//
+// This is a regression test for a bug where we were incorrectly bit-shifting
+// in Next64().
+//
+// Note: Our RNG actually only generates 31 bits of randomness for 32 bit
+// integers. If all bits need to be randomized, callers must use Random::Next64().
+// This test reflects that, and if  we change the RNG algo this test should also change.
+TEST_F(RandomTest, TestUseOfBits) {
+  // For Next32():
+  uint32_t ones32 = std::numeric_limits<uint32_t>::max();
+  uint32_t zeroes32 = 0;
+  // For Next64():
+  uint64_t ones64 = std::numeric_limits<uint64_t>::max();
+  uint64_t zeroes64 = 0;
+
+  for (int i = 0; i < 10000000; i++) {
+    uint32_t r32 = rng_.Next32();
+    ones32 &= r32;
+    zeroes32 |= r32;
+
+    uint64_t r64 = rng_.Next64();
+    ones64 &= r64;
+    zeroes64 |= r64;
+  }
+
+  // At the end, we should have flipped 31 and 64 bits, respectively. One
+  // detail of the current RNG impl is that Next32() always returns a number
+  // with MSB set to 0.
+  uint32_t expected_bits_31 = std::numeric_limits<uint32_t>::max() >> 1;
+  uint64_t expected_bits_64 = std::numeric_limits<uint64_t>::max();
+
+  ASSERT_EQ(0, ones32);
+  ASSERT_EQ(expected_bits_31, zeroes32);
+  ASSERT_EQ(0, ones64);
+  ASSERT_EQ(expected_bits_64, zeroes64);
+}
+
+TEST_F(RandomTest, TestResetSeed) {
+  rng_.Reset(1);
+  uint64_t first = rng_.Next64();
+  rng_.Reset(1);
+  uint64_t second = rng_.Next64();
+  ASSERT_EQ(first, second);
+}
+
+TEST_F(RandomTest, TestReservoirSample) {
+  // Use a constant seed to avoid flakiness.
+  rng_.Reset(12345);
+
+  vector<int> population;
+  for (int i = 0; i < 100; i++) {
+    population.push_back(i);
+  }
+
+  // Run 1000 trials selecting 5 elements.
+  vector<int> results;
+  vector<int> counts(population.size());
+  std::unordered_set<int> avoid;
+  for (int trial = 0; trial < 1000; trial++) {
+    rng_.ReservoirSample(population, 5, avoid, &results);
+    for (int result : results) {
+      counts[result]++;
+    }
+  }
+
+  // We expect each element to be selected
+  // 50 times on average, but since it's random, it won't be exact.
+  // However, since we use a constant seed, this test won't be flaky.
+  for (int count : counts) {
+    ASSERT_GE(count, 25);
+    ASSERT_LE(count, 75);
+  }
+
+  // Run again, but avoid some particular entries.
+  avoid.insert(3);
+  avoid.insert(10);
+  avoid.insert(20);
+  counts.assign(100, 0);
+  for (int trial = 0; trial < 1000; trial++) {
+    rng_.ReservoirSample(population, 5, avoid, &results);
+    for (int result : results) {
+      counts[result]++;
+    }
+  }
+
+  // Ensure that we didn't ever pick the avoided elements.
+  ASSERT_EQ(0, counts[3]);
+  ASSERT_EQ(0, counts[10]);
+  ASSERT_EQ(0, counts[20]);
+}
+
+TEST_F(RandomTest, TestReservoirSamplePopulationTooSmall) {
+  vector<int> population;
+  for (int i = 0; i < 10; i++) {
+    population.push_back(i);
+  }
+
+  vector<int> results;
+  std::unordered_set<int> avoid;
+  rng_.ReservoirSample(population, 20, avoid, &results);
+  ASSERT_EQ(population.size(), results.size());
+  ASSERT_EQ(population, results);
+
+  rng_.ReservoirSample(population, 10, avoid, &results);
+  ASSERT_EQ(population.size(), results.size());
+  ASSERT_EQ(population, results);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random.h b/be/src/kudu/util/random.h
new file mode 100644
index 0000000..e31e475
--- /dev/null
+++ b/be/src/kudu/util/random.h
@@ -0,0 +1,252 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#ifndef KUDU_UTIL_RANDOM_H_
+#define KUDU_UTIL_RANDOM_H_
+
+#include <cmath>
+#include <cstdint>
+#include <mutex>
+#include <random>
+#include <vector>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+namespace random_internal {
+
+static const uint32_t M = 2147483647L;   // 2^31-1
+
+} // namespace random_internal
+
+template<class R>
+class StdUniformRNG;
+
+// A very simple random number generator.  Not especially good at
+// generating truly random bits, but good enough for our needs in this
+// package. This implementation is not thread-safe.
+class Random {
+ private:
+  uint32_t seed_;
+ public:
+  explicit Random(uint32_t s) {
+    Reset(s);
+  }
+
+  // Reset the RNG to the given seed value.
+  void Reset(uint32_t s) {
+    seed_ = s & 0x7fffffffu;
+    // Avoid bad seeds.
+    if (seed_ == 0 || seed_ == random_internal::M) {
+      seed_ = 1;
+    }
+  }
+
+  // Next pseudo-random 32-bit unsigned integer.
+  // FIXME: This currently only generates 31 bits of randomness.
+  // The MSB will always be zero.
+  uint32_t Next() {
+    static const uint64_t A = 16807;  // bits 14, 8, 7, 5, 2, 1, 0
+    // We are computing
+    //       seed_ = (seed_ * A) % M,    where M = 2^31-1
+    //
+    // seed_ must not be zero or M, or else all subsequent computed values
+    // will be zero or M respectively.  For all other values, seed_ will end
+    // up cycling through every number in [1,M-1]
+    uint64_t product = seed_ * A;
+
+    // Compute (product % M) using the fact that ((x << 31) % M) == x.
+    seed_ = static_cast<uint32_t>((product >> 31) + (product & random_internal::M));
+    // The first reduction may overflow by 1 bit, so we may need to
+    // repeat.  mod == M is not possible; using > allows the faster
+    // sign-bit-based test.
+    if (seed_ > random_internal::M) {
+      seed_ -= random_internal::M;
+    }
+    return seed_;
+  }
+
+  // Alias for consistency with Next64
+  uint32_t Next32() { return Next(); }
+
+  // Next pseudo-random 64-bit unsigned integer.
+  uint64_t Next64() {
+    uint64_t large = Next();
+    large <<= 31;
+    large |= Next();
+    // Fill in the highest two MSBs.
+    large |= implicit_cast<uint64_t>(Next32()) << 62;
+    return large;
+  }
+
+  // Returns a uniformly distributed value in the range [0..n-1]
+  // REQUIRES: n > 0
+  uint32_t Uniform(uint32_t n) { return Next() % n; }
+
+  // Alias for consistency with Uniform64
+  uint32_t Uniform32(uint32_t n) { return Uniform(n); }
+
+  // Returns a uniformly distributed 64-bit value in the range [0..n-1]
+  // REQUIRES: n > 0
+  uint64_t Uniform64(uint64_t n) { return Next64() % n; }
+
+  // Randomly returns true ~"1/n" of the time, and false otherwise.
+  // REQUIRES: n > 0
+  bool OneIn(int n) { return (Next() % n) == 0; }
+
+  // Skewed: pick "base" uniformly from range [0,max_log] and then
+  // return "base" random bits.  The effect is to pick a number in the
+  // range [0,2^max_log-1] with exponential bias towards smaller numbers.
+  uint32_t Skewed(int max_log) {
+    return Uniform(1 << Uniform(max_log + 1));
+  }
+
+  // Samples a random number from the given normal distribution.
+  double Normal(double mean, double std_dev);
+
+  // Return a random number between 0.0 and 1.0 inclusive.
+  double NextDoubleFraction() {
+    return Next() / static_cast<double>(random_internal::M + 1.0);
+  }
+
+  // Sample 'k' random elements from the collection 'c' into 'result', taking care not to sample any
+  // elements that are already present in 'avoid'.
+  //
+  // In the case that 'c' has fewer than 'k' elements then all elements in 'c' will be selected.
+  //
+  // 'c' should be an iterable STL collection such as a vector, set, or list.
+  // 'avoid' should be an STL-compatible set.
+  //
+  // The results are not stored in a randomized order: the order of results will
+  // match their order in the input collection.
+  template<class Collection, class Set, class T>
+  void ReservoirSample(const Collection& c, int k, const Set& avoid,
+                       std::vector<T>* result) {
+    result->clear();
+    result->reserve(k);
+    int i = 0;
+    for (const T& elem : c) {
+      if (ContainsKey(avoid, elem)) {
+        continue;
+      }
+      i++;
+      // Fill the reservoir if there is available space.
+      if (result->size() < k) {
+        result->push_back(elem);
+        continue;
+      }
+      // Otherwise replace existing elements with decreasing probability.
+      int j = Uniform(i);
+      if (j < k) {
+        (*result)[j] = elem;
+      }
+    }
+  }
+};
+
+// Thread-safe wrapper around Random.
+class ThreadSafeRandom {
+ public:
+  explicit ThreadSafeRandom(uint32_t s)
+      : random_(s) {
+  }
+
+  void Reset(uint32_t s) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    random_.Reset(s);
+  }
+
+  uint32_t Next() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Next();
+  }
+
+  uint32_t Next32() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Next32();
+  }
+
+  uint64_t Next64() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Next64();
+  }
+
+  uint32_t Uniform(uint32_t n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Uniform(n);
+  }
+
+  uint32_t Uniform32(uint32_t n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Uniform32(n);
+  }
+
+  uint64_t Uniform64(uint64_t n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Uniform64(n);
+  }
+
+  bool OneIn(int n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.OneIn(n);
+  }
+
+  uint32_t Skewed(int max_log) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Skewed(max_log);
+  }
+
+  double Normal(double mean, double std_dev) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Normal(mean, std_dev);
+  }
+
+  double NextDoubleFraction() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.NextDoubleFraction();
+  }
+
+  template<class Collection, class Set, class T>
+  void ReservoirSample(const Collection& c, int k, const Set& avoid,
+                       std::vector<T>* result) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    random_.ReservoirSample(c, k, avoid, result);
+  }
+
+ private:
+  simple_spinlock lock_;
+  Random random_;
+};
+
+// Wraps either Random or ThreadSafeRandom as a C++ standard library
+// compliant UniformRandomNumberGenerator:
+//   http://en.cppreference.com/w/cpp/concept/UniformRandomNumberGenerator
+template<class R>
+class StdUniformRNG {
+ public:
+  typedef uint32_t result_type;
+
+  explicit StdUniformRNG(R* r) : r_(r) {}
+  uint32_t operator()() {
+    return r_->Next32();
+  }
+  constexpr static uint32_t min() { return 0; }
+  constexpr static uint32_t max() { return (1L << 31) - 1; }
+
+ private:
+  R* r_;
+};
+
+// Defined outside the class to make use of StdUniformRNG above.
+inline double Random::Normal(double mean, double std_dev) {
+  std::normal_distribution<> nd(mean, std_dev);
+  StdUniformRNG<Random> gen(this);
+  return nd(gen);
+}
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_RANDOM_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util-test.cc b/be/src/kudu/util/random_util-test.cc
new file mode 100644
index 0000000..f3eb7d5
--- /dev/null
+++ b/be/src/kudu/util/random_util-test.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/random_util.h"
+
+#include <algorithm>
+#include <cmath>
+#include <cstring>
+
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class RandomUtilTest : public KuduTest {
+ protected:
+  RandomUtilTest() : rng_(SeedRandom()) {}
+
+  Random rng_;
+
+  static const int kLenMax = 100;
+  static const int kNumTrials = 100;
+};
+
+namespace {
+
+// Checks string defined at start is set to \0 everywhere but [from, to)
+void CheckEmpty(char* start, int from, int to, int stop) {
+  DCHECK_LE(0, from);
+  DCHECK_LE(from, to);
+  DCHECK_LE(to, stop);
+  for (int j = 0; (j == from ? j = to : j) < stop; ++j) {
+    CHECK_EQ(start[j], '\0') << "Index " << j << " not null after defining"
+                             << "indices [" << from << "," << to << ") of "
+                             << "a nulled string [0," << stop << ").";
+  }
+}
+
+} // anonymous namespace
+
+// Makes sure that RandomString only writes the specified amount
+TEST_F(RandomUtilTest, TestRandomString) {
+  char start[kLenMax];
+
+  for (int i = 0; i < kNumTrials; ++i) {
+    memset(start, '\0', kLenMax);
+    int to = rng_.Uniform(kLenMax + 1);
+    int from = rng_.Uniform(to + 1);
+    RandomString(start + from, to - from, &rng_);
+    CheckEmpty(start, from, to, kLenMax);
+  }
+
+  // Corner case
+  memset(start, '\0', kLenMax);
+  RandomString(start, 0, &rng_);
+  CheckEmpty(start, 0, 0, kLenMax);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util.cc b/be/src/kudu/util/random_util.cc
new file mode 100644
index 0000000..21a4144
--- /dev/null
+++ b/be/src/kudu/util/random_util.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/random_util.h"
+
+#include <cmath>
+#include <cstdlib>
+#include <cstring>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "kudu/util/env.h"
+#include "kudu/util/random.h"
+#include "kudu/gutil/walltime.h"
+
+namespace kudu {
+
+void RandomString(void* dest, size_t n, Random* rng) {
+  size_t i = 0;
+  uint32_t random = rng->Next();
+  char* cdest = static_cast<char*>(dest);
+  static const size_t sz = sizeof(random);
+  if (n >= sz) {
+    for (i = 0; i <= n - sz; i += sz) {
+      memcpy(&cdest[i], &random, sizeof(random));
+      random = rng->Next();
+    }
+  }
+  memcpy(cdest + i, &random, n - i);
+}
+
+uint32_t GetRandomSeed32() {
+  uint32_t seed = static_cast<uint32_t>(GetCurrentTimeMicros());
+  seed *= getpid();
+  seed *= Env::Default()->gettid();
+  return seed;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util.h b/be/src/kudu/util/random_util.h
new file mode 100644
index 0000000..e286bbe
--- /dev/null
+++ b/be/src/kudu/util/random_util.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_RANDOM_UTIL_H
+#define KUDU_UTIL_RANDOM_UTIL_H
+
+#include <cstdlib>
+#include <stdint.h>
+
+namespace kudu {
+
+class Random;
+
+// Writes exactly n random bytes to dest using the parameter Random generator.
+// Note RandomString() does not null-terminate its strings, though '\0' could
+// be written to dest with the same probability as any other byte.
+void RandomString(void* dest, size_t n, Random* rng);
+
+// Generate a 32-bit random seed from several sources, including timestamp,
+// pid & tid.
+uint32_t GetRandomSeed32();
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_RANDOM_UTIL_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/resettable_heartbeater-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/resettable_heartbeater-test.cc b/be/src/kudu/util/resettable_heartbeater-test.cc
new file mode 100644
index 0000000..0e4cf3b
--- /dev/null
+++ b/be/src/kudu/util/resettable_heartbeater-test.cc
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/resettable_heartbeater.h"
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+#include <string>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+// Number of heartbeats we want to observe before allowing the test to end.
+static const int kNumHeartbeats = 2;
+
+class ResettableHeartbeaterTest : public KuduTest {
+ public:
+  ResettableHeartbeaterTest()
+    : KuduTest(),
+      latch_(kNumHeartbeats) {
+  }
+
+ protected:
+  void CreateHeartbeater(uint64_t period_ms, const std::string& name) {
+    period_ms_ = period_ms;
+    heartbeater_.reset(
+        new ResettableHeartbeater(name,
+                                  MonoDelta::FromMilliseconds(period_ms),
+                                  boost::bind(&ResettableHeartbeaterTest::HeartbeatFunction,
+                                              this)));
+  }
+
+  Status HeartbeatFunction() {
+    latch_.CountDown();
+    return Status::OK();
+  }
+
+  void WaitForCountDown() {
+    // Wait a large multiple (in the worst case) of the required time before we
+    // time out and fail the test. Large to avoid test flakiness.
+    const uint64_t kMaxWaitMillis = period_ms_ * kNumHeartbeats * 20;
+    CHECK(latch_.WaitFor(MonoDelta::FromMilliseconds(kMaxWaitMillis)))
+        << "Failed to count down " << kNumHeartbeats << " times in " << kMaxWaitMillis
+        << " ms: latch count == " << latch_.count();
+  }
+
+  CountDownLatch latch_;
+  uint64_t period_ms_;
+  gscoped_ptr<ResettableHeartbeater> heartbeater_;
+};
+
+// Tests that if Reset() is not called the heartbeat method is called
+// the expected number of times.
+TEST_F(ResettableHeartbeaterTest, TestRegularHeartbeats) {
+  const int64_t kHeartbeatPeriodMillis = 100; // Heartbeat every 100ms.
+  CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME());
+  ASSERT_OK(heartbeater_->Start());
+  WaitForCountDown();
+  ASSERT_OK(heartbeater_->Stop());
+}
+
+// Tests that if we Reset() the heartbeater in a period smaller than
+// the heartbeat period the heartbeat method never gets called.
+// After we stop resetting heartbeats should resume as normal
+TEST_F(ResettableHeartbeaterTest, TestResetHeartbeats) {
+  const int64_t kHeartbeatPeriodMillis = 800;   // Heartbeat every 800ms.
+  const int64_t kNumResetSlicesPerPeriod = 40;  // Reset 40 times per heartbeat period.
+  // Reset once every 800ms / 40 = 20ms.
+  const int64_t kResetPeriodMillis = kHeartbeatPeriodMillis / kNumResetSlicesPerPeriod;
+
+  CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME());
+  ASSERT_OK(heartbeater_->Start());
+  // Call Reset() in a loop for 2 heartbeat periods' worth of time, with sleeps
+  // in-between as defined above.
+  for (int i = 0; i < kNumResetSlicesPerPeriod * 2; i++) {
+    heartbeater_->Reset();
+    ASSERT_EQ(kNumHeartbeats, latch_.count()); // Ensure we haven't counted down, yet.
+    SleepFor(MonoDelta::FromMilliseconds(kResetPeriodMillis));
+  }
+  WaitForCountDown();
+  ASSERT_OK(heartbeater_->Stop());
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/resettable_heartbeater.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/resettable_heartbeater.cc b/be/src/kudu/util/resettable_heartbeater.cc
new file mode 100644
index 0000000..91c4587
--- /dev/null
+++ b/be/src/kudu/util/resettable_heartbeater.cc
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/resettable_heartbeater.h"
+
+#include <glog/logging.h>
+#include <mutex>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+using std::string;
+
+class ResettableHeartbeaterThread {
+ public:
+  ResettableHeartbeaterThread(std::string name, MonoDelta period,
+                              HeartbeatFunction function);
+
+  Status Start();
+  Status Stop();
+  void Reset();
+
+ private:
+  void RunThread();
+  bool IsCurrentThread() const;
+
+  const string name_;
+
+  // The heartbeat period.
+  const MonoDelta period_;
+
+  // The function to call to perform the heartbeat
+  const HeartbeatFunction function_;
+
+  // The actual running thread (NULL before it is started)
+  scoped_refptr<kudu::Thread> thread_;
+
+  CountDownLatch run_latch_;
+
+  // Whether the heartbeater should shutdown.
+  bool shutdown_;
+
+  // lock that protects access to 'shutdown_' and to 'run_latch_'
+  // Reset() method.
+  mutable simple_spinlock lock_;
+  DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeaterThread);
+};
+
+ResettableHeartbeater::ResettableHeartbeater(const std::string& name,
+                                             MonoDelta period,
+                                             HeartbeatFunction function)
+    : thread_(new ResettableHeartbeaterThread(name, period, function)) {
+}
+
+Status ResettableHeartbeater::Start() {
+  return thread_->Start();
+}
+
+Status ResettableHeartbeater::Stop() {
+  return thread_->Stop();
+}
+void ResettableHeartbeater::Reset() {
+  thread_->Reset();
+}
+
+ResettableHeartbeater::~ResettableHeartbeater() {
+  WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
+}
+
+ResettableHeartbeaterThread::ResettableHeartbeaterThread(
+    std::string name, MonoDelta period, HeartbeatFunction function)
+    : name_(std::move(name)),
+      period_(std::move(period)),
+      function_(std::move(function)),
+      run_latch_(0),
+      shutdown_(false) {}
+
+void ResettableHeartbeaterThread::RunThread() {
+  CHECK(IsCurrentThread());
+  VLOG(1) << "Heartbeater: " << name_ << " thread starting";
+
+  bool prev_reset_was_manual = false;
+  Random rng(random());
+  while (true) {
+    MonoDelta wait_period = period_;
+    if (prev_reset_was_manual) {
+      // When the caller does a manual reset, we randomize the subsequent wait
+      // timeout between period_/2 and period_. This builds in some jitter so
+      // multiple tablets on the same TS don't end up heartbeating in lockstep.
+      int64_t half_period_ms = period_.ToMilliseconds() / 2;
+      wait_period = MonoDelta::FromMilliseconds(
+          half_period_ms +
+          rng.NextDoubleFraction() * half_period_ms);
+      prev_reset_was_manual = false;
+    }
+    if (run_latch_.WaitFor(wait_period)) {
+      // CountDownLatch reached 0 -- this means there was a manual reset.
+      prev_reset_was_manual = true;
+      std::lock_guard<simple_spinlock> lock(lock_);
+      // check if we were told to shutdown
+      if (shutdown_) {
+        // Latch fired -- exit loop
+        VLOG(1) << "Heartbeater: " << name_ << " thread finished";
+        return;
+      } else {
+        // otherwise it's just a reset, reset the latch
+        // and continue;
+        run_latch_.Reset(1);
+        continue;
+      }
+    }
+
+    Status s = function_();
+    if (!s.ok()) {
+      LOG(WARNING)<< "Failed to heartbeat in heartbeater: " << name_
+      << " Status: " << s.ToString();
+      continue;
+    }
+  }
+}
+
+bool ResettableHeartbeaterThread::IsCurrentThread() const {
+  return thread_.get() == kudu::Thread::current_thread();
+}
+
+Status ResettableHeartbeaterThread::Start() {
+  CHECK(thread_ == nullptr);
+  run_latch_.Reset(1);
+  return kudu::Thread::Create("heartbeater", strings::Substitute("$0-heartbeat", name_),
+                              &ResettableHeartbeaterThread::RunThread,
+                              this, &thread_);
+}
+
+void ResettableHeartbeaterThread::Reset() {
+  if (!thread_) {
+    return;
+  }
+  run_latch_.CountDown();
+}
+
+Status ResettableHeartbeaterThread::Stop() {
+  if (!thread_) {
+    return Status::OK();
+  }
+
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (shutdown_) {
+      return Status::OK();
+    }
+    shutdown_ = true;
+  }
+
+  run_latch_.CountDown();
+  RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
+  return Status::OK();
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/resettable_heartbeater.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/resettable_heartbeater.h b/be/src/kudu/util/resettable_heartbeater.h
new file mode 100644
index 0000000..40bbe29
--- /dev/null
+++ b/be/src/kudu/util/resettable_heartbeater.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_RESETTABLE_HEARTBEATER_H_
+#define KUDU_UTIL_RESETTABLE_HEARTBEATER_H_
+
+#include <boost/function.hpp>
+#include <string>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+class MonoDelta;
+class Status;
+class ResettableHeartbeaterThread;
+
+typedef boost::function<Status()> HeartbeatFunction;
+
+// A resettable hearbeater that takes a function and calls
+// it to perform a regular heartbeat, unless Reset() is called
+// in which case the heartbeater resets the heartbeat period.
+// The point is to send "I'm Alive" heartbeats only if no regular
+// messages are sent in the same period.
+//
+// TODO Eventually this should be used instead of the master heartbeater
+// as it shares a lot of logic with the exception of the specific master
+// stuff (and the fact that it is resettable).
+//
+// TODO We'll have a lot of these per server, so eventually we need
+// to refactor this so that multiple heartbeaters share something like
+// java's ScheduledExecutor.
+//
+// TODO Do something about failed hearbeats, right now this is just
+// logging. Probably could take more arguments and do more of an
+// exponential backoff.
+//
+// This class is thread safe.
+class ResettableHeartbeater {
+ public:
+  ResettableHeartbeater(const std::string& name,
+                        MonoDelta period,
+                        HeartbeatFunction function);
+
+  // Starts the heartbeater
+  Status Start();
+
+  // Stops the hearbeater
+  Status Stop();
+
+  // Resets the heartbeat period.
+  // When this is called, the subsequent heartbeat has some built-in jitter and
+  // may trigger before a full period (as specified to the constructor).
+  void Reset();
+
+  ~ResettableHeartbeater();
+ private:
+  gscoped_ptr<ResettableHeartbeaterThread> thread_;
+
+  DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeater);
+};
+
+}  // namespace kudu
+
+#endif /* KUDU_UTIL_RESETTABLE_HEARTBEATER_H_ */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rle-encoding.h b/be/src/kudu/util/rle-encoding.h
new file mode 100644
index 0000000..120a159
--- /dev/null
+++ b/be/src/kudu/util/rle-encoding.h
@@ -0,0 +1,523 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_RLE_ENCODING_H
+#define IMPALA_RLE_ENCODING_H
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/bit-stream-utils.inline.h"
+#include "kudu/util/bit-util.h"
+
+namespace kudu {
+
+// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
+// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
+// (literal encoding).
+// For both types of runs, there is a byte-aligned indicator which encodes the length
+// of the run and the type of the run.
+// This encoding has the benefit that when there aren't any long enough runs, values
+// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
+// the run length are byte aligned. This allows for very efficient decoding
+// implementations.
+// The encoding is:
+//    encoded-block := run*
+//    run := literal-run | repeated-run
+//    literal-run := literal-indicator < literal bytes >
+//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
+//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
+//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+// Each run is preceded by a varint. The varint's least significant bit is
+// used to indicate whether the run is a literal run or a repeated run. The rest
+// of the varint is used to determine the length of the run (eg how many times the
+// value repeats).
+//
+// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
+// in groups of 8), so that no matter the bit-width of the value, the sequence will end
+// on a byte boundary without padding.
+// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
+// the actual number of encoded ints. (This means that the total number of encoded values
+// can not be determined from the encoded data, since the number of values in the last
+// group may not be a multiple of 8).
+// There is a break-even point when it is more storage efficient to do run length
+// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
+// for both the repeated encoding or the literal encoding.  This value can always
+// be computed based on the bit-width.
+// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
+//
+// Examples with bit-width 1 (eg encoding booleans):
+// ----------------------------------------
+// 100 1s followed by 100 0s:
+// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
+//  - (total 4 bytes)
+//
+// alternating 1s and 0s (200 total):
+// 200 ints = 25 groups of 8
+// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+// (total 26 bytes, 1 byte overhead)
+//
+
+// Decoder class for RLE encoded data.
+//
+// NOTE: the encoded format does not have any length prefix or any other way of
+// indicating that the encoded sequence ends at a certain point, so the Decoder
+// methods may return some extra bits at the end before the read methods start
+// to return 0/false.
+template<typename T>
+class RleDecoder {
+ public:
+  // Create a decoder object. buffer/buffer_len is the decoded data.
+  // bit_width is the width of each value (before encoding).
+  RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
+    : bit_reader_(buffer, buffer_len),
+      bit_width_(bit_width),
+      current_value_(0),
+      repeat_count_(0),
+      literal_count_(0),
+      rewind_state_(CANT_REWIND) {
+    DCHECK_GE(bit_width_, 1);
+    DCHECK_LE(bit_width_, 64);
+  }
+
+  RleDecoder() {}
+
+  // Skip n values, and returns the number of non-zero entries skipped.
+  size_t Skip(size_t to_skip);
+
+  // Gets the next value.  Returns false if there are no more.
+  bool Get(T* val);
+
+  // Seek to the previous value.
+  void RewindOne();
+
+  // Gets the next run of the same 'val'. Returns 0 if there is no
+  // more data to be decoded. Will return a run of at most 'max_run'
+  // values. If there are more values than this, the next call to
+  // GetNextRun will return more from the same run.
+  size_t GetNextRun(T* val, size_t max_run);
+
+ private:
+  bool ReadHeader();
+
+  enum RewindState {
+    REWIND_LITERAL,
+    REWIND_RUN,
+    CANT_REWIND
+  };
+
+  BitReader bit_reader_;
+  int bit_width_;
+  uint64_t current_value_;
+  uint32_t repeat_count_;
+  uint32_t literal_count_;
+  RewindState rewind_state_;
+};
+
+// Class to incrementally build the rle data.
+// The encoding has two modes: encoding repeated runs and literal runs.
+// If the run is sufficiently short, it is more efficient to encode as a literal run.
+// This class does so by buffering 8 values at a time.  If they are not all the same
+// they are added to the literal run.  If they are the same, they are added to the
+// repeated run.  When we switch modes, the previous run is flushed out.
+template<typename T>
+class RleEncoder {
+ public:
+  // buffer: buffer to write bits to.
+  // bit_width: max number of bits for value.
+  // TODO: consider adding a min_repeated_run_length so the caller can control
+  // when values should be encoded as repeated runs.  Currently this is derived
+  // based on the bit_width, which can determine a storage optimal choice.
+  explicit RleEncoder(faststring *buffer, int bit_width)
+    : bit_width_(bit_width),
+      bit_writer_(buffer) {
+    DCHECK_GE(bit_width_, 1);
+    DCHECK_LE(bit_width_, 64);
+    Clear();
+  }
+
+  // Reserve 'num_bytes' bytes for a plain encoded header, set each
+  // byte with 'val': this is used for the RLE-encoded data blocks in
+  // order to be able to able to store the initial ordinal position
+  // and number of elements. This is a part of RleEncoder in order to
+  // maintain the correct offset in 'buffer'.
+  void Reserve(int num_bytes, uint8_t val);
+
+  // Encode value. This value must be representable with bit_width_ bits.
+  void Put(T value, size_t run_length = 1);
+
+  // Flushes any pending values to the underlying buffer.
+  // Returns the total number of bytes written
+  int Flush();
+
+  // Resets all the state in the encoder.
+  void Clear();
+
+  int32_t len() const { return bit_writer_.bytes_written(); }
+
+ private:
+  // Flushes any buffered values.  If this is part of a repeated run, this is largely
+  // a no-op.
+  // If it is part of a literal run, this will call FlushLiteralRun, which writes
+  // out the buffered literal values.
+  // If 'done' is true, the current run would be written even if it would normally
+  // have been buffered more.  This should only be called at the end, when the
+  // encoder has received all values even if it would normally continue to be
+  // buffered.
+  void FlushBufferedValues(bool done);
+
+  // Flushes literal values to the underlying buffer.  If update_indicator_byte,
+  // then the current literal run is complete and the indicator byte is updated.
+  void FlushLiteralRun(bool update_indicator_byte);
+
+  // Flushes a repeated run to the underlying buffer.
+  void FlushRepeatedRun();
+
+  // Number of bits needed to encode the value.
+  const int bit_width_;
+
+  // Underlying buffer.
+  BitWriter bit_writer_;
+
+  // We need to buffer at most 8 values for literals.  This happens when the
+  // bit_width is 1 (so 8 values fit in one byte).
+  // TODO: generalize this to other bit widths
+  uint64_t buffered_values_[8];
+
+  // Number of values in buffered_values_
+  int num_buffered_values_;
+
+  // The current (also last) value that was written and the count of how
+  // many times in a row that value has been seen.  This is maintained even
+  // if we are in a literal run.  If the repeat_count_ get high enough, we switch
+  // to encoding repeated runs.
+  uint64_t current_value_;
+  int repeat_count_;
+
+  // Number of literals in the current run.  This does not include the literals
+  // that might be in buffered_values_.  Only after we've got a group big enough
+  // can we decide if they should part of the literal_count_ or repeat_count_
+  int literal_count_;
+
+  // Index of a byte in the underlying buffer that stores the indicator byte.
+  // This is reserved as soon as we need a literal run but the value is written
+  // when the literal run is complete. We maintain an index rather than a pointer
+  // into the underlying buffer because the pointer value may become invalid if
+  // the underlying buffer is resized.
+  int literal_indicator_byte_idx_;
+};
+
+template<typename T>
+inline bool RleDecoder<T>::ReadHeader() {
+  DCHECK(bit_reader_.is_initialized());
+  if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
+    // Read the next run's indicator int, it could be a literal or repeated run
+    // The int is encoded as a vlq-encoded value.
+    int32_t indicator_value = 0;
+    bool result = bit_reader_.GetVlqInt(&indicator_value);
+    if (PREDICT_FALSE(!result)) {
+      return false;
+    }
+
+    // lsb indicates if it is a literal run or repeated run
+    bool is_literal = indicator_value & 1;
+    if (is_literal) {
+      literal_count_ = (indicator_value >> 1) * 8;
+      DCHECK_GT(literal_count_, 0);
+    } else {
+      repeat_count_ = indicator_value >> 1;
+      DCHECK_GT(repeat_count_, 0);
+      bool result = bit_reader_.GetAligned<T>(
+          BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(&current_value_));
+      DCHECK(result);
+    }
+  }
+  return true;
+}
+
+template<typename T>
+inline bool RleDecoder<T>::Get(T* val) {
+  DCHECK(bit_reader_.is_initialized());
+  if (PREDICT_FALSE(!ReadHeader())) {
+    return false;
+  }
+
+  if (PREDICT_TRUE(repeat_count_ > 0)) {
+    *val = current_value_;
+    --repeat_count_;
+    rewind_state_ = REWIND_RUN;
+  } else {
+    DCHECK(literal_count_ > 0);
+    bool result = bit_reader_.GetValue(bit_width_, val);
+    DCHECK(result);
+    --literal_count_;
+    rewind_state_ = REWIND_LITERAL;
+  }
+
+  return true;
+}
+
+template<typename T>
+inline void RleDecoder<T>::RewindOne() {
+  DCHECK(bit_reader_.is_initialized());
+
+  switch (rewind_state_) {
+    case CANT_REWIND:
+      LOG(FATAL) << "Can't rewind more than once after each read!";
+      break;
+    case REWIND_RUN:
+      ++repeat_count_;
+      break;
+    case REWIND_LITERAL:
+      {
+        bit_reader_.Rewind(bit_width_);
+        ++literal_count_;
+        break;
+      }
+  }
+
+  rewind_state_ = CANT_REWIND;
+}
+
+template<typename T>
+inline size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
+  DCHECK(bit_reader_.is_initialized());
+  DCHECK_GT(max_run, 0);
+  size_t ret = 0;
+  size_t rem = max_run;
+  while (ReadHeader()) {
+    if (PREDICT_TRUE(repeat_count_ > 0)) {
+      if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
+        return ret;
+      }
+      *val = current_value_;
+      if (repeat_count_ >= rem) {
+        // The next run is longer than the amount of remaining data
+        // that the caller wants to read. Only consume it partially.
+        repeat_count_ -= rem;
+        ret += rem;
+        return ret;
+      }
+      ret += repeat_count_;
+      rem -= repeat_count_;
+      repeat_count_ = 0;
+    } else {
+      DCHECK(literal_count_ > 0);
+      if (ret == 0) {
+        bool has_more = bit_reader_.GetValue(bit_width_, val);
+        DCHECK(has_more);
+        literal_count_--;
+        ret++;
+        rem--;
+      }
+
+      while (literal_count_ > 0) {
+        bool result = bit_reader_.GetValue(bit_width_, &current_value_);
+        DCHECK(result);
+        if (current_value_ != *val || rem == 0) {
+          bit_reader_.Rewind(bit_width_);
+          return ret;
+        }
+        ret++;
+        rem--;
+        literal_count_--;
+      }
+    }
+  }
+  return ret;
+ }
+
+template<typename T>
+inline size_t RleDecoder<T>::Skip(size_t to_skip) {
+  DCHECK(bit_reader_.is_initialized());
+
+  size_t set_count = 0;
+  while (to_skip > 0) {
+    bool result = ReadHeader();
+    DCHECK(result);
+
+    if (PREDICT_TRUE(repeat_count_ > 0)) {
+      size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
+      repeat_count_ -= nskip;
+      to_skip -= nskip;
+      if (current_value_ != 0) {
+        set_count += nskip;
+      }
+    } else {
+      DCHECK(literal_count_ > 0);
+      size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
+      literal_count_ -= nskip;
+      to_skip -= nskip;
+      while (nskip--) {
+        T value = 0;
+        bool result = bit_reader_.GetValue(bit_width_, &value);
+        DCHECK(result);
+        if (value != 0) {
+          set_count++;
+        }
+      }
+    }
+  }
+  return set_count;
+}
+
+// This function buffers input values 8 at a time.  After seeing all 8 values,
+// it decides whether they should be encoded as a literal or repeated run.
+template<typename T>
+inline void RleEncoder<T>::Put(T value, size_t run_length) {
+  DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
+
+  // TODO(perf): remove the loop and use the repeat_count_
+  while (run_length--) {
+    if (PREDICT_TRUE(current_value_ == value)) {
+      ++repeat_count_;
+      if (repeat_count_ > 8) {
+        // This is just a continuation of the current run, no need to buffer the
+        // values.
+        // Note that this is the fast path for long repeated runs.
+        continue;
+      }
+    } else {
+      if (repeat_count_ >= 8) {
+        // We had a run that was long enough but it has ended.  Flush the
+        // current repeated run.
+        DCHECK_EQ(literal_count_, 0);
+        FlushRepeatedRun();
+      }
+      repeat_count_ = 1;
+      current_value_ = value;
+    }
+
+    buffered_values_[num_buffered_values_] = value;
+    if (++num_buffered_values_ == 8) {
+      DCHECK_EQ(literal_count_ % 8, 0);
+      FlushBufferedValues(false);
+    }
+  }
+}
+
+template<typename T>
+inline void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
+  if (literal_indicator_byte_idx_ < 0) {
+    // The literal indicator byte has not been reserved yet, get one now.
+    literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
+    DCHECK_GE(literal_indicator_byte_idx_, 0);
+  }
+
+  // Write all the buffered values as bit packed literals
+  for (int i = 0; i < num_buffered_values_; ++i) {
+    bit_writer_.PutValue(buffered_values_[i], bit_width_);
+  }
+  num_buffered_values_ = 0;
+
+  if (update_indicator_byte) {
+    // At this point we need to write the indicator byte for the literal run.
+    // We only reserve one byte, to allow for streaming writes of literal values.
+    // The logic makes sure we flush literal runs often enough to not overrun
+    // the 1 byte.
+    int num_groups = BitUtil::Ceil(literal_count_, 8);
+    int32_t indicator_value = (num_groups << 1) | 1;
+    DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
+    bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
+    literal_indicator_byte_idx_ = -1;
+    literal_count_ = 0;
+  }
+}
+
+template<typename T>
+inline void RleEncoder<T>::FlushRepeatedRun() {
+  DCHECK_GT(repeat_count_, 0);
+  // The lsb of 0 indicates this is a repeated run
+  int32_t indicator_value = repeat_count_ << 1 | 0;
+  bit_writer_.PutVlqInt(indicator_value);
+  bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
+  num_buffered_values_ = 0;
+  repeat_count_ = 0;
+}
+
+// Flush the values that have been buffered.  At this point we decide whether
+// we need to switch between the run types or continue the current one.
+template<typename T>
+inline void RleEncoder<T>::FlushBufferedValues(bool done) {
+  if (repeat_count_ >= 8) {
+    // Clear the buffered values.  They are part of the repeated run now and we
+    // don't want to flush them out as literals.
+    num_buffered_values_ = 0;
+    if (literal_count_ != 0) {
+      // There was a current literal run.  All the values in it have been flushed
+      // but we still need to update the indicator byte.
+      DCHECK_EQ(literal_count_ % 8, 0);
+      DCHECK_EQ(repeat_count_, 8);
+      FlushLiteralRun(true);
+    }
+    DCHECK_EQ(literal_count_, 0);
+    return;
+  }
+
+  literal_count_ += num_buffered_values_;
+  int num_groups = BitUtil::Ceil(literal_count_, 8);
+  if (num_groups + 1 >= (1 << 6)) {
+    // We need to start a new literal run because the indicator byte we've reserved
+    // cannot store more values.
+    DCHECK_GE(literal_indicator_byte_idx_, 0);
+    FlushLiteralRun(true);
+  } else {
+    FlushLiteralRun(done);
+  }
+  repeat_count_ = 0;
+}
+
+template<typename T>
+inline void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
+  for (int i = 0; i < num_bytes; ++i) {
+    bit_writer_.PutValue(val, 8);
+  }
+}
+
+template<typename T>
+inline int RleEncoder<T>::Flush() {
+  if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
+    bool all_repeat = literal_count_ == 0 &&
+        (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
+    // There is something pending, figure out if it's a repeated or literal run
+    if (repeat_count_ > 0 && all_repeat) {
+      FlushRepeatedRun();
+    } else  {
+      literal_count_ += num_buffered_values_;
+      FlushLiteralRun(true);
+      repeat_count_ = 0;
+    }
+  }
+  bit_writer_.Flush();
+  DCHECK_EQ(num_buffered_values_, 0);
+  DCHECK_EQ(literal_count_, 0);
+  DCHECK_EQ(repeat_count_, 0);
+  return bit_writer_.bytes_written();
+}
+
+template<typename T>
+inline void RleEncoder<T>::Clear() {
+  current_value_ = 0;
+  repeat_count_ = 0;
+  num_buffered_values_ = 0;
+  literal_count_ = 0;
+  literal_indicator_byte_idx_ = -1;
+  bit_writer_.Clear();
+}
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rle-test.cc b/be/src/kudu/util/rle-test.cc
new file mode 100644
index 0000000..185fed5
--- /dev/null
+++ b/be/src/kudu/util/rle-test.cc
@@ -0,0 +1,537 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <stdlib.h>
+#include <stdio.h>
+
+// Must come before gtest.h.
+#include "kudu/gutil/mathlimits.h"
+
+#include <boost/utility/binary.hpp>
+#include <gtest/gtest.h>
+#include <string>
+#include <vector>
+
+#include "kudu/util/rle-encoding.h"
+#include "kudu/util/bit-stream-utils.h"
+#include "kudu/util/hexdump.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+const int MAX_WIDTH = 64;
+
+class TestRle : public KuduTest {};
+
+TEST(BitArray, TestBool) {
+  const int len_bytes = 2;
+  faststring buffer(len_bytes);
+
+  BitWriter writer(&buffer);
+
+  // Write alternating 0's and 1's
+  for (int i = 0; i < 8; ++i) {
+    writer.PutValue(i % 2, 1);
+  }
+  writer.Flush();
+  EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+
+  // Write 00110011
+  for (int i = 0; i < 8; ++i) {
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        writer.PutValue(0, 1);
+        break;
+      default:
+        writer.PutValue(1, 1);
+        break;
+    }
+  }
+  writer.Flush();
+
+  // Validate the exact bit value
+  EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+  EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
+
+  // Use the reader and validate
+  BitReader reader(buffer.data(), buffer.size());
+  for (int i = 0; i < 8; ++i) {
+    bool val = false;
+    bool result = reader.GetValue(1, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, i % 2);
+  }
+
+  for (int i = 0; i < 8; ++i) {
+    bool val = false;
+    bool result = reader.GetValue(1, &val);
+    EXPECT_TRUE(result);
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        EXPECT_EQ(val, false);
+        break;
+      default:
+        EXPECT_EQ(val, true);
+        break;
+    }
+  }
+}
+
+// Writes 'num_vals' values with width 'bit_width' and reads them back.
+void TestBitArrayValues(int bit_width, int num_vals) {
+  const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8);
+  const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width;
+
+  faststring buffer(kTestLen);
+  BitWriter writer(&buffer);
+  for (int i = 0; i < num_vals; ++i) {
+    writer.PutValue(i % mod, bit_width);
+  }
+  writer.Flush();
+  EXPECT_EQ(writer.bytes_written(), kTestLen);
+
+  BitReader reader(buffer.data(), kTestLen);
+  for (int i = 0; i < num_vals; ++i) {
+    int64_t val = 0;
+    bool result = reader.GetValue(bit_width, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, i % mod);
+  }
+  EXPECT_EQ(reader.bytes_left(), 0);
+}
+
+TEST(BitArray, TestValues) {
+  for (int width = 1; width <= MAX_WIDTH; ++width) {
+    TestBitArrayValues(width, 1);
+    TestBitArrayValues(width, 2);
+    // Don't write too many values
+    TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
+    TestBitArrayValues(width, 1024);
+  }
+}
+
+// Test some mixed values
+TEST(BitArray, TestMixed) {
+  const int kTestLenBits = 1024;
+  faststring buffer(kTestLenBits / 8);
+  bool parity = true;
+
+  BitWriter writer(&buffer);
+  for (int i = 0; i < kTestLenBits; ++i) {
+    if (i % 2 == 0) {
+      writer.PutValue(parity, 1);
+      parity = !parity;
+    } else {
+      writer.PutValue(i, 10);
+    }
+  }
+  writer.Flush();
+
+  parity = true;
+  BitReader reader(buffer.data(), buffer.size());
+  for (int i = 0; i < kTestLenBits; ++i) {
+    bool result;
+    if (i % 2 == 0) {
+      bool val = false;
+      result = reader.GetValue(1, &val);
+      EXPECT_EQ(val, parity);
+      parity = !parity;
+    } else {
+      int val;
+      result = reader.GetValue(10, &val);
+      EXPECT_EQ(val, i);
+    }
+    EXPECT_TRUE(result);
+  }
+}
+
+// Validates encoding of values by encoding and decoding them.  If
+// expected_encoding != NULL, also validates that the encoded buffer is
+// exactly 'expected_encoding'.
+// if expected_len is not -1, it will validate the encoded size is correct.
+template<typename T>
+void ValidateRle(const vector<T>& values, int bit_width,
+    uint8_t* expected_encoding, int expected_len) {
+  faststring buffer;
+  RleEncoder<T> encoder(&buffer, bit_width);
+
+  for (const auto& value : values) {
+    encoder.Put(value);
+  }
+  int encoded_len = encoder.Flush();
+
+  if (expected_len != -1) {
+    EXPECT_EQ(encoded_len, expected_len);
+  }
+  if (expected_encoding != nullptr) {
+    EXPECT_EQ(memcmp(buffer.data(), expected_encoding, expected_len), 0)
+      << "\n"
+      << "Expected: " << HexDump(Slice(expected_encoding, expected_len)) << "\n"
+      << "Got:      " << HexDump(Slice(buffer));
+  }
+
+  // Verify read
+  RleDecoder<T> decoder(buffer.data(), encoded_len, bit_width);
+  for (const auto& value : values) {
+    T val = 0;
+    bool result = decoder.Get(&val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(value, val);
+  }
+}
+
+TEST(Rle, SpecificSequences) {
+  const int kTestLen = 1024;
+  uint8_t expected_buffer[kTestLen];
+  vector<uint64_t> values;
+
+  // Test 50 0' followed by 50 1's
+  values.resize(100);
+  for (int i = 0; i < 50; ++i) {
+    values[i] = 0;
+  }
+  for (int i = 50; i < 100; ++i) {
+    values[i] = 1;
+  }
+
+  // expected_buffer valid for bit width <= 1 byte
+  expected_buffer[0] = (50 << 1);
+  expected_buffer[1] = 0;
+  expected_buffer[2] = (50 << 1);
+  expected_buffer[3] = 1;
+  for (int width = 1; width <= 8; ++width) {
+    ValidateRle(values, width, expected_buffer, 4);
+  }
+
+  for (int width = 9; width <= MAX_WIDTH; ++width) {
+    ValidateRle(values, width, nullptr, 2 * (1 + BitUtil::Ceil(width, 8)));
+  }
+
+  // Test 100 0's and 1's alternating
+  for (int i = 0; i < 100; ++i) {
+    values[i] = i % 2;
+  }
+  int num_groups = BitUtil::Ceil(100, 8);
+  expected_buffer[0] = (num_groups << 1) | 1;
+  for (int i = 0; i < 100/8; ++i) {
+    expected_buffer[i + 1] = BOOST_BINARY(1 0 1 0 1 0 1 0); // 0xaa
+  }
+  // Values for the last 4 0 and 1's
+  expected_buffer[1 + 100/8] = BOOST_BINARY(0 0 0 0 1 0 1 0); // 0x0a
+
+  // num_groups and expected_buffer only valid for bit width = 1
+  ValidateRle(values, 1, expected_buffer, 1 + num_groups);
+  for (int width = 2; width <= MAX_WIDTH; ++width) {
+    ValidateRle(values, width, nullptr, 1 + BitUtil::Ceil(width * 100, 8));
+  }
+}
+
+// ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value
+// is used, otherwise alternating values are used.
+void TestRleValues(int bit_width, int num_vals, int value = -1) {
+  const uint64_t mod = bit_width == 64 ? 1ULL : 1ULL << bit_width;
+  vector<uint64_t> values;
+  for (uint64_t v = 0; v < num_vals; ++v) {
+    values.push_back((value != -1) ? value : (bit_width == 64 ? v : (v % mod)));
+  }
+  ValidateRle(values, bit_width, nullptr, -1);
+}
+
+TEST(Rle, TestValues) {
+  for (int width = 1; width <= MAX_WIDTH; ++width) {
+    TestRleValues(width, 1);
+    TestRleValues(width, 1024);
+    TestRleValues(width, 1024, 0);
+    TestRleValues(width, 1024, 1);
+  }
+}
+
+class BitRle : public KuduTest {
+};
+
+// Tests all true/false values
+TEST_F(BitRle, AllSame) {
+  const int kTestLen = 1024;
+  vector<bool> values;
+
+  for (int v = 0; v < 2; ++v) {
+    values.clear();
+    for (int i = 0; i < kTestLen; ++i) {
+      values.push_back(v ? true : false);
+    }
+
+    ValidateRle(values, 1, nullptr, 3);
+  }
+}
+
+// Test that writes out a repeated group and then a literal
+// group but flush before finishing.
+TEST_F(BitRle, Flush) {
+  vector<bool> values;
+  for (int i = 0; i < 16; ++i) values.push_back(1);
+  values.push_back(false);
+  ValidateRle(values, 1, nullptr, -1);
+  values.push_back(true);
+  ValidateRle(values, 1, nullptr, -1);
+  values.push_back(true);
+  ValidateRle(values, 1, nullptr, -1);
+  values.push_back(true);
+  ValidateRle(values, 1, nullptr, -1);
+}
+
+// Test some random bool sequences.
+TEST_F(BitRle, RandomBools) {
+  int iters = 0;
+  const int n_iters = AllowSlowTests() ? 1000 : 20;
+  while (iters < n_iters) {
+    srand(iters++);
+    if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters;
+    vector<uint64_t > values;
+    bool parity = 0;
+    for (int i = 0; i < 1000; ++i) {
+      int group_size = rand() % 20 + 1; // NOLINT(*)
+      if (group_size > 16) {
+        group_size = 1;
+      }
+      for (int i = 0; i < group_size; ++i) {
+        values.push_back(parity);
+      }
+      parity = !parity;
+    }
+    ValidateRle(values, (iters % MAX_WIDTH) + 1, nullptr, -1);
+  }
+}
+
+// Test some random 64-bit sequences.
+TEST_F(BitRle, Random64Bit) {
+  int iters = 0;
+  const int n_iters = AllowSlowTests() ? 1000 : 20;
+  while (iters < n_iters) {
+    srand(iters++);
+    if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters;
+    vector<uint64_t > values;
+    for (int i = 0; i < 1000; ++i) {
+      int group_size = rand() % 20 + 1; // NOLINT(*)
+      uint64_t cur_value = (static_cast<uint64_t>(rand()) << 32) + static_cast<uint64_t>(rand());
+      if (group_size > 16) {
+        group_size = 1;
+      }
+      for (int i = 0; i < group_size; ++i) {
+        values.push_back(cur_value);
+      }
+
+    }
+    ValidateRle(values, 64, nullptr, -1);
+  }
+}
+
+// Test a sequence of 1 0's, 2 1's, 3 0's. etc
+// e.g. 011000111100000
+TEST_F(BitRle, RepeatedPattern) {
+  vector<bool> values;
+  const int min_run = 1;
+  const int max_run = 32;
+
+  for (int i = min_run; i <= max_run; ++i) {
+    int v = i % 2;
+    for (int j = 0; j < i; ++j) {
+      values.push_back(v);
+    }
+  }
+
+  // And go back down again
+  for (int i = max_run; i >= min_run; --i) {
+    int v = i % 2;
+    for (int j = 0; j < i; ++j) {
+      values.push_back(v);
+    }
+  }
+
+  ValidateRle(values, 1, nullptr, -1);
+}
+
+TEST_F(TestRle, TestBulkPut) {
+  size_t run_length;
+  bool val = false;
+
+  faststring buffer(1);
+  RleEncoder<bool> encoder(&buffer, 1);
+  encoder.Put(true, 10);
+  encoder.Put(false, 7);
+  encoder.Put(true, 5);
+  encoder.Put(true, 15);
+  encoder.Flush();
+
+  RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(10, run_length);
+
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_FALSE(val);
+  ASSERT_EQ(7, run_length);
+
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(20, run_length);
+
+  ASSERT_EQ(0, decoder.GetNextRun(&val, MathLimits<size_t>::kMax));
+}
+
+TEST_F(TestRle, TestGetNextRun) {
+  // Repeat the test with different number of items
+  for (int num_items = 7; num_items < 200; num_items += 13) {
+    // Test different block patterns
+    //    1: 01010101 01010101
+    //    2: 00110011 00110011
+    //    3: 00011100 01110001
+    //    ...
+    for (int block = 1; block <= 20; ++block) {
+      faststring buffer(1);
+      RleEncoder<bool> encoder(&buffer, 1);
+      for (int j = 0; j < num_items; ++j) {
+        encoder.Put(!!(j & 1), block);
+      }
+      encoder.Flush();
+
+      RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+      size_t count = num_items * block;
+      for (int j = 0; j < num_items; ++j) {
+        size_t run_length;
+        bool val = false;
+        DCHECK_GT(count, 0);
+        run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+        run_length = std::min(run_length, count);
+
+        ASSERT_EQ(!!(j & 1), val);
+        ASSERT_EQ(block, run_length);
+        count -= run_length;
+      }
+      DCHECK_EQ(count, 0);
+    }
+  }
+}
+
+// Generate a random bit string which consists of 'num_runs' runs,
+// each with a random length between 1 and 100. Returns the number
+// of values encoded (i.e the sum run length).
+static size_t GenerateRandomBitString(int num_runs, faststring* enc_buf, string* string_rep) {
+  RleEncoder<bool> enc(enc_buf, 1);
+  int num_bits = 0;
+  for (int i = 0; i < num_runs; i++) {
+    int run_length = random() % 100;
+    bool value = static_cast<bool>(i & 1);
+    enc.Put(value, run_length);
+    string_rep->append(run_length, value ? '1' : '0');
+    num_bits += run_length;
+  }
+  enc.Flush();
+  return num_bits;
+}
+
+TEST_F(TestRle, TestRoundTripRandomSequencesWithRuns) {
+  SeedRandom();
+
+  // Test the limiting function of GetNextRun.
+  const int kMaxToReadAtOnce = (random() % 20) + 1;
+
+  // Generate a bunch of random bit sequences, and "round-trip" them
+  // through the encode/decode sequence.
+  for (int rep = 0; rep < 100; rep++) {
+    faststring buf;
+    string string_rep;
+    int num_bits = GenerateRandomBitString(10, &buf, &string_rep);
+    RleDecoder<bool> decoder(buf.data(), buf.size(), 1);
+    string roundtrip_str;
+    int rem_to_read = num_bits;
+    size_t run_len;
+    bool val;
+    while (rem_to_read > 0 &&
+           (run_len = decoder.GetNextRun(&val, std::min(kMaxToReadAtOnce, rem_to_read))) != 0) {
+      ASSERT_LE(run_len, kMaxToReadAtOnce);
+      roundtrip_str.append(run_len, val ? '1' : '0');
+      rem_to_read -= run_len;
+    }
+
+    ASSERT_EQ(string_rep, roundtrip_str);
+  }
+}
+TEST_F(TestRle, TestSkip) {
+  faststring buffer(1);
+  RleEncoder<bool> encoder(&buffer, 1);
+
+  // 0101010[1] 01010101 01
+  //        "A"
+  for (int j = 0; j < 18; ++j) {
+    encoder.Put(!!(j & 1));
+  }
+
+  // 0011[00] 11001100 11001100 11001100 11001100
+  //      "B"
+  for (int j = 0; j < 19; ++j) {
+    encoder.Put(!!(j & 1), 2);
+  }
+
+  // 000000000000 11[1111111111] 000000000000 111111111111
+  //                   "C"
+  // 000000000000 111111111111 0[00000000000] 111111111111
+  //                                  "D"
+  // 000000000000 111111111111 000000000000 111111111111
+  for (int j = 0; j < 12; ++j) {
+    encoder.Put(!!(j & 1), 12);
+  }
+  encoder.Flush();
+
+  bool val = false;
+  size_t run_length;
+  RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1);
+
+  // position before "A"
+  ASSERT_EQ(3, decoder.Skip(7));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(1, run_length);
+
+  // position before "B"
+  ASSERT_EQ(7, decoder.Skip(14));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_FALSE(val);
+  ASSERT_EQ(2, run_length);
+
+  // position before "C"
+  ASSERT_EQ(18, decoder.Skip(46));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_TRUE(val);
+  ASSERT_EQ(10, run_length);
+
+  // position before "D"
+  ASSERT_EQ(24, decoder.Skip(49));
+  run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax);
+  ASSERT_FALSE(val);
+  ASSERT_EQ(11, run_length);
+
+  encoder.Flush();
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rolling_log-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/rolling_log-test.cc b/be/src/kudu/util/rolling_log-test.cc
new file mode 100644
index 0000000..3c6f60b
--- /dev/null
+++ b/be/src/kudu/util/rolling_log-test.cc
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/rolling_log.h"
+
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class RollingLogTest : public KuduTest {
+ public:
+  RollingLogTest()
+    : log_dir_(GetTestPath("log_dir")) {
+  }
+
+  virtual void SetUp() OVERRIDE {
+    ASSERT_OK(env_->CreateDir(log_dir_));
+  }
+
+ protected:
+  void AssertLogCount(int expected_count, vector<string>* children) {
+    vector<string> dir_entries;
+    ASSERT_OK(env_->GetChildren(log_dir_, &dir_entries));
+    children->clear();
+
+    for (const string& child : dir_entries) {
+      if (child == "." || child == "..") continue;
+      children->push_back(child);
+      ASSERT_TRUE(HasPrefixString(child, "rolling_log-test."));
+      ASSERT_STR_CONTAINS(child, ".mylog.");
+
+      string pid_suffix = Substitute("$0", getpid());
+      ASSERT_TRUE(HasSuffixString(child, pid_suffix) ||
+                  HasSuffixString(child, pid_suffix + ".gz")) << "bad child: " << child;
+    }
+    ASSERT_EQ(children->size(), expected_count) << *children;
+  }
+
+  const string log_dir_;
+};
+
+// Test with compression off.
+TEST_F(RollingLogTest, TestLog) {
+  RollingLog log(env_, log_dir_, "mylog");
+  log.SetCompressionEnabled(false);
+  log.SetSizeLimitBytes(100);
+
+  // Before writing anything, we shouldn't open a log file.
+  vector<string> children;
+  NO_FATALS(AssertLogCount(0, &children));
+
+  // Appending some data should write a new segment.
+  ASSERT_OK(log.Append("Hello world\n"));
+  NO_FATALS(AssertLogCount(1, &children));
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(log.Append("Hello world\n"));
+  }
+  NO_FATALS(AssertLogCount(2, &children));
+
+  faststring data;
+  string path = JoinPathSegments(log_dir_, children[0]);
+  ASSERT_OK(ReadFileToString(env_, path, &data));
+  ASSERT_TRUE(HasPrefixString(data.ToString(), "Hello world\n"))
+    << "Data missing";
+  ASSERT_LE(data.size(), 100) << "Size limit not respected";
+}
+
+// Test with compression on.
+TEST_F(RollingLogTest, TestCompression) {
+  RollingLog log(env_, log_dir_, "mylog");
+  ASSERT_OK(log.Open());
+
+  StringPiece data = "Hello world\n";
+  int raw_size = 0;
+  for (int i = 0; i < 1000; i++) {
+    ASSERT_OK(log.Append(data));
+    raw_size += data.size();
+  }
+  ASSERT_OK(log.Close());
+
+  vector<string> children;
+  NO_FATALS(AssertLogCount(1, &children));
+  ASSERT_TRUE(HasSuffixString(children[0], ".gz"));
+
+  // Ensure that the output is actually gzipped.
+  uint64_t size;
+  ASSERT_OK(env_->GetFileSize(JoinPathSegments(log_dir_, children[0]), &size));
+  ASSERT_LT(size, raw_size / 10);
+  ASSERT_GT(size, 0);
+}
+
+} // namespace kudu



Mime
View raw message