impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [08/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8
Date Sat, 17 Jun 2017 07:25:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.cc b/be/src/kudu/util/pb_util.cc
new file mode 100644
index 0000000..a83c309
--- /dev/null
+++ b/be/src/kudu/util/pb_util.cc
@@ -0,0 +1,956 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Some portions copyright (C) 2008, Google, inc.
+//
+// Utilities for working with protobufs.
+// Some of this code is cribbed from the protobuf source,
+// but modified to work with kudu's 'faststring' instead of STL strings.
+
+#include "kudu/util/pb_util.h"
+
+#include <deque>
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <sstream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/descriptor_database.h>
+#include <google/protobuf/dynamic_message.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/text_format.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/coding-inl.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util-internal.h"
+#include "kudu/util/pb_util.pb.h"
+#include "kudu/util/status.h"
+
+using google::protobuf::Descriptor;
+using google::protobuf::DescriptorPool;
+using google::protobuf::DynamicMessageFactory;
+using google::protobuf::FieldDescriptor;
+using google::protobuf::FileDescriptor;
+using google::protobuf::FileDescriptorProto;
+using google::protobuf::FileDescriptorSet;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::Message;
+using google::protobuf::MessageLite;
+using google::protobuf::Reflection;
+using google::protobuf::SimpleDescriptorDatabase;
+using google::protobuf::TextFormat;
+using kudu::crc::Crc;
+using kudu::pb_util::internal::SequentialFileFileInputStream;
+using kudu::pb_util::internal::WritableFileOutputStream;
+using std::deque;
+using std::endl;
+using std::initializer_list;
+using std::ostream;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+using strings::Utf8SafeCEscape;
+
+namespace std {
+
+// Allow the use of FileState with DCHECK_EQ.
+std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) {
+  os << static_cast<int>(state);
+  return os;
+}
+
+} // namespace std
+
+namespace kudu {
+namespace pb_util {
+
+static const char* const kTmpTemplateSuffix = ".XXXXXX";
+
+// Protobuf container constants.
+static const uint32_t kPBContainerInvalidVersion = 0;
+static const uint32_t kPBContainerDefaultVersion = 2;
+static const int kPBContainerChecksumLen = sizeof(uint32_t);
+static const char kPBContainerMagic[] = "kuducntr";
+static const int kPBContainerMagicLen = 8;
+static const int kPBContainerV1HeaderLen =
+    kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version.
+static const int kPBContainerV2HeaderLen =
+    kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum.
+
+const int kPBContainerMinimumValidLength = kPBContainerV1HeaderLen;
+
+static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen,
+              "kPBContainerMagic does not match expected length");
+
+namespace {
+
+// When serializing, we first compute the byte size, then serialize the message.
+// If serialization produces a different number of bytes than expected, we
+// call this function, which crashes.  The problem could be due to a bug in the
+// protobuf implementation but is more likely caused by concurrent modification
+// of the message.  This function attempts to distinguish between the two and
+// provide a useful error message.
+void ByteSizeConsistencyError(int byte_size_before_serialization,
+                              int byte_size_after_serialization,
+                              int bytes_produced_by_serialization) {
+  CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
+      << "Protocol message was modified concurrently during serialization.";
+  CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
+      << "Byte size calculation and serialization were inconsistent.  This "
+         "may indicate a bug in protocol buffers or it may be caused by "
+         "concurrent modification of the message.";
+  LOG(FATAL) << "This shouldn't be called if all the sizes are equal.";
+}
+
+string InitializationErrorMessage(const char* action,
+                                  const MessageLite& message) {
+  // Note:  We want to avoid depending on strutil in the lite library, otherwise
+  //   we'd use:
+  //
+  // return strings::Substitute(
+  //   "Can't $0 message of type \"$1\" because it is missing required "
+  //   "fields: $2",
+  //   action, message.GetTypeName(),
+  //   message.InitializationErrorString());
+
+  string result;
+  result += "Can't ";
+  result += action;
+  result += " message of type \"";
+  result += message.GetTypeName();
+  result += "\" because it is missing required fields: ";
+  result += message.InitializationErrorString();
+  return result;
+}
+
+// Returns true iff the specified protobuf container file version is supported
+// by this implementation.
+bool IsSupportedContainerVersion(uint32_t version) {
+  if (version == 1 || version == 2) {
+    return true;
+  }
+  return false;
+}
+
+// Reads exactly 'length' bytes from the container file into 'scratch',
+// validating that there is sufficient data in the file to read this length
+// before attempting to do so, and validating that it has read that length
+// after performing the read.
+//
+// If the file size is less than the requested size of the read, returns
+// Status::Incomplete.
+// If there is an unexpected short read, returns Status::Corruption.
+//
+// A Slice of the bytes read into 'scratch' is returned in 'result'.
+template<typename ReadableFileType>
+Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size,
+                           uint64_t* offset, uint64_t length,
+                           Slice* result, unique_ptr<uint8_t[]>* scratch) {
+  // Validate the read length using the file size.
+  if (*offset + length > file_size) {
+    return Status::Incomplete("File size not large enough to be valid",
+                              Substitute("Proto container file $0: "
+                                  "Tried to read $1 bytes at offset "
+                                  "$2 but file size is only $3 bytes",
+                                  reader->filename(), length,
+                                  *offset, file_size));
+  }
+
+  // Perform the read.
+  unique_ptr<uint8_t[]> local_scratch(new uint8_t[length]);
+  Slice s(local_scratch.get(), length);
+  RETURN_NOT_OK(reader->Read(*offset, &s));
+  CHECK_EQ(length, s.size()) // Should never trigger due to contract with reader APIs.
+      << Substitute("Unexpected short read: Proto container file $0: Tried to read $1 bytes "
+                    "but only read $2 bytes",
+                    reader->filename(), length, s.size());
+
+  *offset += length;
+  *result = s;
+  scratch->swap(local_scratch);
+  return Status::OK();
+}
+
+// Helper macro for use with ParseAndCompareChecksum(). Example usage:
+// RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { data }),
+//    CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset));
+#define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \
+  Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset)
+
+// Parses a checksum from the specified buffer and compares it to the bytes
+// given in 'slices' by calculating a rolling CRC32 checksum of the bytes in
+// the 'slices'.
+// If they match, returns OK. Otherwise, returns Status::Corruption.
+Status ParseAndCompareChecksum(const uint8_t* checksum_buf,
+                               const initializer_list<Slice>& slices) {
+  uint32_t written_checksum = DecodeFixed32(checksum_buf);
+  uint64_t actual_checksum = 0;
+  Crc* crc32c = crc::GetCrc32cInstance();
+  for (Slice s : slices) {
+    crc32c->Compute(s.data(), s.size(), &actual_checksum);
+  }
+  if (PREDICT_FALSE(actual_checksum != written_checksum)) {
+    return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1",
+                                         written_checksum, actual_checksum));
+  }
+  return Status::OK();
+}
+
+// Read and parse a message of the specified format at the given offset in the
+// format documented in pb_util.h. 'offset' is an in-out parameter and will be
+// updated with the new offset on success. On failure, 'offset' is not modified.
+template<typename ReadableFileType>
+Status ReadPBStartingAt(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) {
+  uint64_t tmp_offset = *offset;
+  VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset;
+
+  uint64_t file_size;
+  RETURN_NOT_OK(reader->Size(&file_size));
+  if (tmp_offset == file_size) {
+    return Status::EndOfFile("Reached end of file");
+  }
+
+  // Read the data length from the file.
+  // Version 2+ includes a checksum for the length field.
+  uint64_t length_buflen = (version == 1) ? sizeof(uint32_t)
+                                          : sizeof(uint32_t) + kPBContainerChecksumLen;
+  Slice len_and_cksum_slice;
+  unique_ptr<uint8_t[]> length_scratch;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen,
+                                            &len_and_cksum_slice, &length_scratch),
+                        Substitute("Could not read data length from proto container file $0 "
+                                   "at offset $1", reader->filename(), *offset));
+  Slice length(len_and_cksum_slice.data(), sizeof(uint32_t));
+
+  // Versions >= 2 have an individual checksum for the data length.
+  if (version >= 2) {
+    Slice length_checksum(len_and_cksum_slice.data() + sizeof(uint32_t), kPBContainerChecksumLen);
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(length_checksum.data(), { length }),
+        CHECKSUM_ERR_MSG("Data length checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  }
+  uint32_t data_length = DecodeFixed32(length.data());
+
+  // Read body and checksum into buffer for checksum & parsing.
+  uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen;
+  Slice body_and_cksum_slice;
+  unique_ptr<uint8_t[]> body_scratch;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen,
+                                            &body_and_cksum_slice, &body_scratch),
+                        Substitute("Could not read PB message data from proto container file $0 "
+                                   "at offset $1",
+                                   reader->filename(), tmp_offset));
+  Slice body(body_and_cksum_slice.data(), data_length);
+  Slice record_checksum(body_and_cksum_slice.data() + data_length, kPBContainerChecksumLen);
+
+  // Version 1 has a single checksum for length, body.
+  // Version 2+ has individual checksums for length and body, respectively.
+  if (version == 1) {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { length, body }),
+        CHECKSUM_ERR_MSG("Length and data checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  } else {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { body }),
+        CHECKSUM_ERR_MSG("Data checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  }
+
+  // The checksum is correct. Time to decode the body.
+  //
+  // We could compare pb_type_ against msg.GetTypeName(), but:
+  // 1. pb_type_ is not available when reading the supplemental header,
+  // 2. ParseFromArray() should fail if the data cannot be parsed into the
+  //    provided message type.
+
+  // To permit parsing of very large PB messages, we must use parse through a
+  // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
+  // say that 512MB is the shortest theoretical message length that may produce
+  // integer overflow warnings, so that's what we'll use.
+  ArrayInputStream ais(body.data(), body.size());
+  CodedInputStream cis(&ais);
+  cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+  if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
+    return Status::IOError("Unable to parse PB from path", reader->filename());
+  }
+
+  *offset = tmp_offset;
+  return Status::OK();
+}
+
+// Wrapper around ReadPBStartingAt() to enforce that we don't return
+// Status::Incomplete() for V1 format files.
+template<typename ReadableFileType>
+Status ReadFullPB(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) {
+  Status s = ReadPBStartingAt(reader, version, offset, msg);
+  if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) {
+    return Status::Corruption("Unrecoverable incomplete record", s.ToString());
+  }
+  return s;
+}
+
+// Read and parse the protobuf container file-level header documented in pb_util.h.
+template<typename ReadableFileType>
+Status ParsePBFileHeader(ReadableFileType* reader, uint64_t* offset, int* version) {
+  uint64_t file_size;
+  RETURN_NOT_OK(reader->Size(&file_size));
+
+  // We initially read enough data for a V2+ file header. This optimizes for
+  // V2+ and is valid on a V1 file because we don't consider these files valid
+  // unless they contain a record in addition to the file header. The
+  // additional 4 bytes required by a V2+ header (vs V1) is still less than the
+  // minimum number of bytes required for a V1 format data record.
+  uint64_t tmp_offset = *offset;
+  Slice header;
+  unique_ptr<uint8_t[]> scratch;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen,
+                                            &header, &scratch),
+                        Substitute("Could not read header for proto container file $0",
+                                   reader->filename()));
+  Slice magic_and_version(header.data(), kPBContainerMagicLen + sizeof(uint32_t));
+  Slice checksum(header.data() + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen);
+
+  // Validate magic number.
+  if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) {
+    string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen);
+    return Status::Corruption("Invalid magic number",
+                              Substitute("Expected: $0, found: $1",
+                                         Utf8SafeCEscape(kPBContainerMagic),
+                                         Utf8SafeCEscape(file_magic)));
+  }
+
+  // Validate container file version.
+  uint32_t tmp_version = DecodeFixed32(header.data() + kPBContainerMagicLen);
+  if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) {
+    return Status::NotSupported(
+        Substitute("Protobuf container has unsupported version: $0. Default version: $1",
+                   tmp_version, kPBContainerDefaultVersion));
+  }
+
+  // Versions >= 2 have a checksum after the magic number and encoded version
+  // to ensure the integrity of these fields.
+  if (tmp_version >= 2) {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { magic_and_version }),
+        CHECKSUM_ERR_MSG("File header checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  } else {
+    // Version 1 doesn't have a header checksum. Rewind our read offset so this
+    // data will be read again when we next attempt to read a data record.
+    tmp_offset -= kPBContainerChecksumLen;
+  }
+
+  *offset = tmp_offset;
+  *version = tmp_version;
+  return Status::OK();
+}
+
+// Read and parse the supplemental header from the container file.
+template<typename ReadableFileType>
+Status ReadSupplementalHeader(ReadableFileType* reader, int version, uint64_t* offset,
+                              ContainerSupHeaderPB* sup_header) {
+  RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, offset, sup_header),
+      Substitute("Could not read supplemental header from proto container file $0 "
+                 "with version $1 at offset $2",
+                 reader->filename(), version, *offset));
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+void AppendToString(const MessageLite &msg, faststring *output) {
+  DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
+  AppendPartialToString(msg, output);
+}
+
+void AppendPartialToString(const MessageLite &msg, faststring* output) {
+  size_t old_size = output->size();
+  int byte_size = msg.ByteSize();
+  // Messages >2G cannot be serialized due to overflow computing ByteSize.
+  DCHECK_GE(byte_size, 0) << "Error computing ByteSize";
+
+  output->resize(old_size + static_cast<size_t>(byte_size));
+
+  uint8* start = &((*output)[old_size]);
+  uint8* end = msg.SerializeWithCachedSizesToArray(start);
+  if (end - start != byte_size) {
+    ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
+  }
+}
+
+void SerializeToString(const MessageLite &msg, faststring *output) {
+  output->clear();
+  AppendToString(msg, output);
+}
+
+Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) {
+  SequentialFileFileInputStream istream(rfile);
+  if (!msg->ParseFromZeroCopyStream(&istream)) {
+    RETURN_NOT_OK(istream.status());
+
+    // If it's not a file IO error then it's a parsing error.
+    // Probably, we read wrong or damaged data here.
+    return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
+  }
+  return Status::OK();
+}
+
+Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) {
+  if (!msg->ParseFromArray(data, length)) {
+    return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
+  }
+  return Status::OK();
+}
+
+Status WritePBToPath(Env* env, const std::string& path,
+                     const MessageLite& msg,
+                     SyncMode sync) {
+  const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
+  string tmp_path;
+
+  unique_ptr<WritableFile> file;
+  RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
+  env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
+
+  WritableFileOutputStream ostream(file.get());
+  bool res = msg.SerializeToZeroCopyStream(&ostream);
+  if (!res || !ostream.Flush()) {
+    return Status::IOError("Unable to serialize PB to file");
+  }
+
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path);
+  }
+  RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path);
+  RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path);
+  tmp_deleter.Cancel();
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path);
+  }
+  return Status::OK();
+}
+
+Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
+  shared_ptr<SequentialFile> rfile;
+  RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
+  RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get()));
+  return Status::OK();
+}
+
+static void TruncateString(string* s, int max_len) {
+  if (s->size() > max_len) {
+    s->resize(max_len);
+    s->append("<truncated>");
+  }
+}
+
+void TruncateFields(Message* message, int max_len) {
+  const Reflection* reflection = message->GetReflection();
+  vector<const FieldDescriptor*> fields;
+  reflection->ListFields(*message, &fields);
+  for (const FieldDescriptor* field : fields) {
+    if (field->is_repeated()) {
+      for (int i = 0; i < reflection->FieldSize(*message, field); i++) {
+        switch (field->cpp_type()) {
+          case FieldDescriptor::CPPTYPE_STRING: {
+            const string& s_const = reflection->GetRepeatedStringReference(*message, field, i,
+                                                                           nullptr);
+            TruncateString(const_cast<string*>(&s_const), max_len);
+            break;
+          }
+          case FieldDescriptor::CPPTYPE_MESSAGE: {
+            TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len);
+            break;
+          }
+          default:
+            break;
+        }
+      }
+    } else {
+      switch (field->cpp_type()) {
+        case FieldDescriptor::CPPTYPE_STRING: {
+          const string& s_const = reflection->GetStringReference(*message, field, nullptr);
+          TruncateString(const_cast<string*>(&s_const), max_len);
+          break;
+        }
+        case FieldDescriptor::CPPTYPE_MESSAGE: {
+          TruncateFields(reflection->MutableMessage(message, field), max_len);
+          break;
+        }
+        default:
+          break;
+      }
+    }
+  }
+}
+
+namespace {
+class SecureFieldPrinter : public TextFormat::FieldValuePrinter {
+ public:
+  using super = TextFormat::FieldValuePrinter;
+
+  string PrintFieldName(const Message& message,
+                        const Reflection* reflection,
+                        const FieldDescriptor* field) const override {
+    hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING &&
+        field->options().GetExtension(REDACT);
+    return super::PrintFieldName(message, reflection, field);
+  }
+
+  string PrintString(const string& val) const override {
+    if (hide_next_string_) {
+      hide_next_string_ = false;
+      return KUDU_REDACT(super::PrintString(val));
+    }
+    return super::PrintString(val);
+  }
+  string PrintBytes(const string& val) const override {
+    if (hide_next_string_) {
+      hide_next_string_ = false;
+      return KUDU_REDACT(super::PrintBytes(val));
+    }
+    return super::PrintBytes(val);
+  }
+
+  mutable bool hide_next_string_ = false;
+};
+} // anonymous namespace
+
+string SecureDebugString(const Message& msg) {
+  string debug_string;
+  TextFormat::Printer printer;
+  printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
+  printer.PrintToString(msg, &debug_string);
+  return debug_string;
+}
+
+string SecureShortDebugString(const Message& msg) {
+  string debug_string;
+
+  TextFormat::Printer printer;
+  printer.SetSingleLineMode(true);
+  printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
+
+  printer.PrintToString(msg, &debug_string);
+  // Single line mode currently might have an extra space at the end.
+  if (!debug_string.empty() &&
+      debug_string[debug_string.size() - 1] == ' ') {
+    debug_string.resize(debug_string.size() - 1);
+  }
+
+  return debug_string;
+}
+
+
+WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer)
+  : state_(FileState::NOT_INITIALIZED),
+    offset_(0),
+    version_(kPBContainerDefaultVersion),
+    writer_(std::move(writer)) {
+}
+
+WritablePBContainerFile::~WritablePBContainerFile() {
+  WARN_NOT_OK(Close(), "Could not Close() when destroying file");
+}
+
+Status WritablePBContainerFile::SetVersionForTests(int version) {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  if (!IsSupportedContainerVersion(version)) {
+    return Status::NotSupported(Substitute("Version $0 is not supported", version));
+  }
+  version_ = version;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::CreateNew(const Message& msg) {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+
+  const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen
+                                              : kPBContainerV1HeaderLen + kPBContainerChecksumLen;
+
+  faststring buf;
+  buf.resize(kHeaderLen);
+
+  // Serialize the magic.
+  strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen);
+  uint64_t offset = kPBContainerMagicLen;
+
+  // Serialize the version.
+  InlineEncodeFixed32(buf.data() + offset, version_);
+  offset += sizeof(uint32_t);
+  DCHECK_EQ(kPBContainerV1HeaderLen, offset)
+    << "Serialized unexpected number of total bytes";
+
+  // Versions >= 2: Checksum the magic and version.
+  if (version_ >= 2) {
+    uint32_t header_checksum = crc::Crc32c(buf.data(), offset);
+    InlineEncodeFixed32(buf.data() + offset, header_checksum);
+    offset += sizeof(uint32_t);
+  }
+
+  // Serialize the supplemental header.
+  ContainerSupHeaderPB sup_header;
+  PopulateDescriptorSet(msg.GetDescriptor()->file(),
+                        sup_header.mutable_protos());
+  sup_header.set_pb_type(msg.GetTypeName());
+  RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf),
+                        "Failed to prepare supplemental header for writing");
+
+  // Write the serialized buffer to the file.
+  RETURN_NOT_OK_PREPEND(AppendBytes(buf),
+                        "Failed to append header to file");
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::OpenExisting() {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
+  ContainerSupHeaderPB sup_header;
+  RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header));
+  RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file.
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::AppendBytes(const Slice& data) {
+  std::lock_guard<Mutex> l(offset_lock_);
+  RETURN_NOT_OK(writer_->Write(offset_, data));
+  offset_ += data.size();
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Append(const Message& msg) {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  faststring buf;
+  RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
+                        "Failed to prepare buffer for writing");
+  RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file");
+
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Flush() {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  // TODO: Flush just the dirty bytes.
+  RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_ASYNC, 0, 0), "Failed to Flush() file");
+
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Sync() {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
+
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Close() {
+  if (state_ != FileState::CLOSED) {
+    state_ = FileState::CLOSED;
+    Status s = writer_->Close();
+    writer_.reset();
+    RETURN_NOT_OK_PREPEND(s, "Failed to Close() file");
+  }
+  return Status::OK();
+}
+
+const string& WritablePBContainerFile::filename() const {
+  return writer_->filename();
+}
+
+Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
+  DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
+  int data_len = msg.ByteSize();
+  // Messages >2G cannot be serialized due to overflow computing ByteSize.
+  DCHECK_GE(data_len, 0) << "Error computing ByteSize";
+  uint64_t record_buflen =  sizeof(uint32_t) + data_len + sizeof(uint32_t);
+  if (version_ >= 2) {
+    record_buflen += sizeof(uint32_t); // Additional checksum just for the length.
+  }
+
+  // Grow the buffer to hold the new data.
+  uint64_t record_offset = buf->size();
+  buf->resize(record_offset + record_buflen);
+  uint8_t* dst = buf->data() + record_offset;
+
+  // Serialize the data length.
+  size_t cur_offset = 0;
+  InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len));
+  cur_offset += sizeof(uint32_t);
+
+  // For version >= 2: Serialize the checksum of the data length.
+  if (version_ >= 2) {
+    uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len));
+    InlineEncodeFixed32(dst + cur_offset, length_checksum);
+    cur_offset += sizeof(uint32_t);
+  }
+
+  // Serialize the data.
+  uint64_t data_offset = cur_offset;
+  if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) {
+    return Status::IOError("Failed to serialize PB to array");
+  }
+  cur_offset += data_len;
+
+  // Calculate and serialize the data checksum.
+  // For version 1, this is the checksum of the len + data.
+  // For version >= 2, this is only the checksum of the data.
+  uint32_t data_checksum;
+  if (version_ == 1) {
+    data_checksum = crc::Crc32c(dst, cur_offset);
+  } else {
+    data_checksum = crc::Crc32c(dst + data_offset, data_len);
+  }
+  InlineEncodeFixed32(dst + cur_offset, data_checksum);
+  cur_offset += sizeof(uint32_t);
+
+  DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes";
+  return Status::OK();
+}
+
+void WritablePBContainerFile::PopulateDescriptorSet(
+    const FileDescriptor* desc, FileDescriptorSet* output) {
+  // Because we don't compile protobuf with TSAN enabled, copying the
+  // static PB descriptors in this function ends up triggering a lot of
+  // race reports. We suppress the reports, but TSAN still has to walk
+  // the stack, etc, and this function becomes very slow. So, we ignore
+  // TSAN here.
+  debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+
+  FileDescriptorSet all_descs;
+
+  // Tracks all schemas that have been added to 'unemitted' at one point
+  // or another. Is a superset of 'unemitted' and only ever grows.
+  unordered_set<const FileDescriptor*> processed;
+
+  // Tracks all remaining unemitted schemas.
+  deque<const FileDescriptor*> unemitted;
+
+  InsertOrDie(&processed, desc);
+  unemitted.push_front(desc);
+  while (!unemitted.empty()) {
+    const FileDescriptor* proto = unemitted.front();
+
+    // The current schema is emitted iff we've processed (i.e. emitted) all
+    // of its dependencies.
+    bool emit = true;
+    for (int i = 0; i < proto->dependency_count(); i++) {
+      const FileDescriptor* dep = proto->dependency(i);
+      if (InsertIfNotPresent(&processed, dep)) {
+        unemitted.push_front(dep);
+        emit = false;
+      }
+    }
+    if (emit) {
+      unemitted.pop_front();
+      proto->CopyTo(all_descs.mutable_file()->Add());
+    }
+  }
+  all_descs.Swap(output);
+}
+
+ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader)
+  : state_(FileState::NOT_INITIALIZED),
+    version_(kPBContainerInvalidVersion),
+    offset_(0),
+    reader_(std::move(reader)) {
+}
+
+ReadablePBContainerFile::~ReadablePBContainerFile() {
+  Close();
+}
+
+Status ReadablePBContainerFile::Open() {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &offset_, &version_));
+  ContainerSupHeaderPB sup_header;
+  RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &offset_, &sup_header));
+  protos_.reset(sup_header.release_protos());
+  pb_type_ = sup_header.pb_type();
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return ReadFullPB(reader_.get(), version_, &offset_, msg);
+}
+
+Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  // Use the embedded protobuf information from the container file to
+  // create the appropriate kind of protobuf Message.
+  //
+  // Loading the schemas into a DescriptorDatabase (and not directly into
+  // a DescriptorPool) defers resolution until FindMessageTypeByName()
+  // below, allowing for schemas to be loaded in any order.
+  SimpleDescriptorDatabase db;
+  for (int i = 0; i < protos()->file_size(); i++) {
+    if (!db.Add(protos()->file(i))) {
+      return Status::Corruption("Descriptor not loaded", Substitute(
+          "Could not load descriptor for PB type $0 referenced in container file",
+          pb_type()));
+    }
+  }
+  DescriptorPool pool(&db);
+  const Descriptor* desc = pool.FindMessageTypeByName(pb_type());
+  if (!desc) {
+    return Status::NotFound("Descriptor not found", Substitute(
+        "Could not find descriptor for PB type $0 referenced in container file",
+        pb_type()));
+  }
+  DynamicMessageFactory factory;
+  const Message* prototype = factory.GetPrototype(desc);
+  if (!prototype) {
+    return Status::NotSupported("Descriptor not supported", Substitute(
+        "Descriptor $0 referenced in container file not supported",
+        pb_type()));
+  }
+  unique_ptr<Message> msg(prototype->New());
+
+  // Dump each message in the container file.
+  int count = 0;
+  Status s;
+  for (s = ReadNextPB(msg.get());
+      s.ok();
+      s = ReadNextPB(msg.get())) {
+    if (oneline) {
+      *os << count++ << "\t" << SecureShortDebugString(*msg) << endl;
+    } else {
+      *os << "Message " << count << endl;
+      *os << "-------" << endl;
+      *os << SecureDebugString(*msg) << endl;
+      count++;
+    }
+  }
+  return s.IsEndOfFile() ? s.OK() : s;
+}
+
+Status ReadablePBContainerFile::Close() {
+  state_ = FileState::CLOSED;
+  reader_.reset();
+  return Status::OK();
+}
+
+int ReadablePBContainerFile::version() const {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return version_;
+}
+
+uint64_t ReadablePBContainerFile::offset() const {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return offset_;
+}
+
+Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
+  unique_ptr<RandomAccessFile> file;
+  RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
+
+  ReadablePBContainerFile pb_file(std::move(file));
+  RETURN_NOT_OK(pb_file.Open());
+  RETURN_NOT_OK(pb_file.ReadNextPB(msg));
+  return pb_file.Close();
+}
+
+Status WritePBContainerToPath(Env* env, const std::string& path,
+                              const Message& msg,
+                              CreateMode create,
+                              SyncMode sync) {
+  TRACE_EVENT2("io", "WritePBContainerToPath",
+               "path", path,
+               "msg_type", msg.GetTypeName());
+
+  if (create == NO_OVERWRITE && env->FileExists(path)) {
+    return Status::AlreadyPresent(Substitute("File $0 already exists", path));
+  }
+
+  const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
+  string tmp_path;
+
+  unique_ptr<RWFile> file;
+  RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, &file));
+  env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
+
+  WritablePBContainerFile pb_file(std::move(file));
+  RETURN_NOT_OK(pb_file.CreateNew(msg));
+  RETURN_NOT_OK(pb_file.Append(msg));
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK(pb_file.Sync());
+  }
+  RETURN_NOT_OK(pb_file.Close());
+  RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path),
+                        "Failed to rename tmp file to " + path);
+  tmp_deleter.Cancel();
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)),
+                          "Failed to SyncDir() parent of " + path);
+  }
+  return Status::OK();
+}
+
+
+scoped_refptr<debug::ConvertableToTraceFormat> PbTracer::TracePb(const Message& msg) {
+  return make_scoped_refptr(new PbTracer(msg));
+}
+
+PbTracer::PbTracer(const Message& msg) : msg_(msg.New()) {
+  msg_->CopyFrom(msg);
+}
+
+void PbTracer::AppendAsTraceFormat(std::string* out) const {
+  pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace);
+  std::ostringstream ss;
+  JsonWriter jw(&ss, JsonWriter::COMPACT);
+  jw.Protobuf(*msg_);
+  out->append(ss.str());
+}
+
+} // namespace pb_util
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.h b/be/src/kudu/util/pb_util.h
new file mode 100644
index 0000000..24119c9
--- /dev/null
+++ b/be/src/kudu/util/pb_util.h
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utilities for dealing with protocol buffers.
+// These are mostly just functions similar to what are found in the protobuf
+// library itself, but using kudu::faststring instances instead of STL strings.
+#ifndef KUDU_UTIL_PB_UTIL_H
+#define KUDU_UTIL_PB_UTIL_H
+
+#include <memory>
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/mutex.h"
+
+namespace google {
+namespace protobuf {
+class FileDescriptor;
+class FileDescriptorSet;
+class MessageLite;
+class Message;
+}
+}
+
+namespace kudu {
+
+class Env;
+class RandomAccessFile;
+class SequentialFile;
+class Slice;
+class Status;
+class RWFile;
+
+namespace pb_util {
+
+using google::protobuf::MessageLite;
+
+enum SyncMode {
+  SYNC,
+  NO_SYNC
+};
+
+enum CreateMode {
+  OVERWRITE,
+  NO_OVERWRITE
+};
+
+enum class FileState {
+  NOT_INITIALIZED,
+  OPEN,
+  CLOSED
+};
+
+// The minimum valid length of a PBC file.
+extern const int kPBContainerMinimumValidLength;
+
+// See MessageLite::AppendToString
+void AppendToString(const MessageLite &msg, faststring *output);
+
+// See MessageLite::AppendPartialToString
+void AppendPartialToString(const MessageLite &msg, faststring *output);
+
+// See MessageLite::SerializeToString.
+void SerializeToString(const MessageLite &msg, faststring *output);
+
+// See MessageLite::ParseFromZeroCopyStream
+Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile);
+
+// Similar to MessageLite::ParseFromArray, with the difference that it returns
+// Status::Corruption() if the message could not be parsed.
+Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length);
+
+// Load a protobuf from the given path.
+Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg);
+
+// Serialize a protobuf to the given path.
+//
+// If SyncMode SYNC is provided, ensures the changes are made durable.
+Status WritePBToPath(Env* env, const std::string& path, const MessageLite& msg, SyncMode sync);
+
+// Truncate any 'bytes' or 'string' fields of this message to max_len.
+// The text "<truncated>" is appended to any such truncated fields.
+void TruncateFields(google::protobuf::Message* message, int max_len);
+
+// Redaction-sensitive variant of Message::DebugString.
+//
+// For most protobufs, this has identical output to Message::DebugString. However,
+// a field with string or binary type may be tagged with the 'kudu.REDACT' option,
+// available by importing 'pb_util.proto'. When such a field is encountered by this
+// method, its contents will be redacted using the 'KUDU_REDACT' macro as documented
+// in kudu/util/logging.h.
+std::string SecureDebugString(const google::protobuf::Message& msg);
+
+// Same as SecureDebugString() above, but equivalent to Message::ShortDebugString.
+std::string SecureShortDebugString(const google::protobuf::Message& msg);
+
+// A protobuf "container" has the following format (all integers in
+// little-endian byte order).
+//
+// <file header>
+// <1 or more records>
+//
+// Note: There are two versions (version 1 and version 2) of the record format.
+// Version 2 of the file format differs from version 1 in the following ways:
+//
+//   * Version 2 has a file header checksum.
+//   * Version 2 has separate checksums for the record length and record data
+//     fields.
+//
+// File header format
+// ------------------
+//
+// Each protobuf container file contains a file header identifying the file.
+// This includes:
+//
+// magic number: 8 byte string identifying the file format.
+//
+//    Included so that we have a minimal guarantee that this file is of the
+//    type we expect and that we are not just reading garbage.
+//
+// container_version: 4 byte unsigned integer indicating the "version" of the
+//                    container format. May be set to 1 or 2.
+//
+//    Included so that this file format may be extended at some later date
+//    while maintaining backwards compatibility.
+//
+// file_header_checksum (version 2+ only): 4 byte unsigned integer with a CRC32C
+//                                         of the magic and version fields.
+//
+//    Included so that we can validate the container version number.
+//
+// The remaining container fields are considered part of a "record". There may
+// be 1 or more records in a valid protobuf container file.
+//
+// Record format
+// -------------
+//
+// data length: 4 byte unsigned integer indicating the size of the encoded data.
+//
+//    Included because PB messages aren't self-delimiting, and thus
+//    writing a stream of messages to the same file requires
+//    delimiting each with its size.
+//
+//    See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming
+//    for more details.
+//
+// length checksum (version 2+ only): 4-byte unsigned integer containing the
+//                                    CRC32C checksum of "data length".
+//
+//    Included so that we may discern the difference between a truncated file
+//    and a corrupted length field.
+//
+// data: "size" bytes of protobuf data encoded according to the schema.
+//
+//    Our payload.
+//
+// data checksum: 4 byte unsigned integer containing the CRC32C checksum of "data".
+//
+//    Included to ensure validity of the data on-disk.
+//    Note: In version 1 of the file format, this is a checksum of both the
+//    "data length" and "data" fields. In version 2+, this is only a checksum
+//    of the "data" field.
+//
+// Supplemental header
+// -------------------
+//
+// A valid container must have at least one record, the first of
+// which is known as the "supplemental header". The supplemental header
+// contains additional container-level information, including the protobuf
+// schema used for the records following it. See pb_util.proto for details. As
+// a containerized PB message, the supplemental header is protected by a CRC32C
+// checksum like any other message.
+//
+// Error detection and tolerance
+// -----------------------------
+//
+// It is worth describing the kinds of errors that can be detected by the
+// protobuf container and the kinds that cannot.
+//
+// The checksums in the container are independent, not rolling. As such,
+// they won't detect the disappearance or reordering of entire protobuf
+// messages, which can happen if a range of the file is collapsed (see
+// man fallocate(2)) or if the file is otherwise manually manipulated.
+//
+// In version 1, the checksums do not protect against corruption in the data
+// length field. However, version 2 of the format resolves that problem. The
+// benefit is that version 2 files can tell the difference between a record
+// with a corrupted length field and a record that was only partially written.
+// See ReadablePBContainerFile::ReadNextPB() for discussion on how this
+// difference is expressed via the API.
+//
+// In version 1 of the format, corruption of the version field in the file
+// header is not detectable. However, version 2 of the format addresses that
+// limitation as well.
+//
+// Corruption of the protobuf data itself is detected in all versions of the
+// file format (subject to CRC32 limitations).
+//
+// The container does not include footers or periodic checkpoints. As such, it
+// will not detect if entire records are truncated.
+//
+// The design and implementation relies on data ordering guarantees provided by
+// the file system to ensure that bytes are written to a file before the file
+// metadata (file size) is updated. A partially-written record (the result of a
+// failed append) is identified by one of the following criteria:
+// 1. Too-few bytes remain in the file to constitute a valid record. For
+//    version 2, that would be fewer than 12 bytes (data len, data len
+//    checksum, and data checksum), or
+// 2. Assuming a record's data length field is valid, then fewer bytes remain
+//    in the file than are specified in the data length field (plus enough for
+//    checksums).
+// In the above scenarios, it is assumed that the system faulted while in the
+// middle of appending a record, and it is considered safe to truncate the file
+// at the beginning of the partial record.
+//
+// If filesystem preallocation is used (at the time of this writing, the
+// implementation does not support preallocation) then even version 2 of the
+// format cannot safely support culling trailing partially-written records.
+// This is because it is not possible to reliably tell the difference between a
+// partially-written record that did not complete fsync (resulting in a bad
+// checksum) vs. a record that successfully was written to disk but then fell
+// victim to bit-level disk corruption. See also KUDU-1414.
+//
+// These tradeoffs in error detection are reasonable given the failure
+// environment that Kudu operates within. We tolerate failures such as
+// "kill -9" of the Kudu process, machine power loss, or fsync/fdatasync
+// failure, but not failures like runaway processes mangling data files
+// in arbitrary ways or attackers crafting malicious data files.
+//
+// In short, no version of the file format will detect truncation of entire
+// protobuf records. Version 2 relies on ordered data flushing semantics for
+// automatic recoverability from partial record writes. Version 1 of the file
+// format cannot support automatic recoverability from partial record writes.
+//
+// For further reading on what files might look like following a normal
+// filesystem failure or disk corruption, and the likelihood of various types
+// of disk errors, see the following papers:
+//
+// https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf
+// https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf
+
+// Protobuf container file opened for writing. Can be built around an existing
+// file or a completely new file.
+//
+// Every function is thread-safe unless indicated otherwise.
+class WritablePBContainerFile {
+ public:
+
+  // Initializes the class instance; writer must be open.
+  explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer);
+
+  // Closes the container if not already closed.
+  ~WritablePBContainerFile();
+
+  // Writes the file header to disk and initializes the write offset to the
+  // byte after the file header. This method should NOT be called when opening
+  // an existing file for append; use OpenExisting() for that.
+  //
+  // 'msg' need not be populated; its type is used to "lock" the container
+  // to a particular protobuf message type in Append().
+  //
+  // Not thread-safe.
+  Status CreateNew(const google::protobuf::Message& msg);
+
+  // Opens an existing protobuf container file for append. The file must
+  // already have a valid file header. To initialize a new blank file for
+  // writing, use CreateNew() instead.
+  //
+  // The file header is read and the version specified there is used as the
+  // format version. The length of the file is also read and is used as the
+  // write offset for subsequent Append() calls. WritablePBContainerFile caches
+  // the write offset instead of constantly calling stat() on the file each
+  // time append is called.
+  //
+  // Not thread-safe.
+  Status OpenExisting();
+
+  // Writes a protobuf message to the container, beginning with its size
+  // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting()
+  // must be called prior to calling Append(), i.e. the file must be open.
+  Status Append(const google::protobuf::Message& msg);
+
+  // Asynchronously flushes all dirty container data to the filesystem.
+  // The file must be open.
+  Status Flush();
+
+  // Synchronizes all dirty container data to the filesystem.
+  // The file must be open.
+  //
+  // Note: the parent directory is _not_ synchronized. Because the
+  // container file was provided during construction, we don't know whether
+  // it was created or reopened, and parent directory synchronization is
+  // only needed in the former case.
+  Status Sync();
+
+  // Closes the container.
+  //
+  // Not thread-safe.
+  Status Close();
+
+  // Returns the path to the container's underlying file handle.
+  const std::string& filename() const;
+
+ private:
+  friend class TestPBUtil;
+  FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet);
+
+  // Set the file format version. Only used for testing.
+  // Must be called before CreateNew().
+  Status SetVersionForTests(int version);
+
+  // Write the protobuf schemas belonging to 'desc' and all of its
+  // dependencies to 'output'.
+  //
+  // Schemas are written in dependency order (i.e. if A depends on B which
+  // depends on C, the order is C, B, A).
+  static void PopulateDescriptorSet(const google::protobuf::FileDescriptor* desc,
+                                    google::protobuf::FileDescriptorSet* output);
+
+  // Serialize the contents of 'msg' into 'buf' along with additional metadata
+  // to aid in deserialization.
+  Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* buf);
+
+  // Append bytes to the file.
+  Status AppendBytes(const Slice& data);
+
+  // State of the file.
+  FileState state_;
+
+  // Protects offset_.
+  Mutex offset_lock_;
+
+  // Current write offset into the file.
+  uint64_t offset_;
+
+  // Protobuf container file version.
+  int version_;
+
+  // File writer.
+  std::shared_ptr<RWFile> writer_;
+};
+
+// Protobuf container file opened for reading.
+//
+// Can be built around a file with existing contents or an empty file (in
+// which case it's safe to interleave with WritablePBContainerFile).
+class ReadablePBContainerFile {
+ public:
+
+  // Initializes the class instance; reader must be open.
+  explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader);
+
+  // Closes the file if not already closed.
+  ~ReadablePBContainerFile();
+
+  // Reads the header information from the container and validates it.
+  // Must be called before any of the other methods.
+  Status Open();
+
+  // Reads a protobuf message from the container, validating its size and
+  // data using a CRC32 checksum. File must be open.
+  //
+  // Return values:
+  // * If there are no more records in the file, returns Status::EndOfFile.
+  // * If there is a partial record, but it is not long enough to be a full
+  //   record or the written length of the record is less than the remaining
+  //   bytes in the file, returns Status::Incomplete. If Status::Incomplete
+  //   is returned, calling offset() will return the point in the file where
+  //   the invalid partial record begins. In order to append additional records
+  //   to the file, the file must first be truncated at that offset.
+  //   Note: Version 1 of this file format will never return
+  //   Status::Incomplete() from this method.
+  // * If a corrupt record is encountered, returns Status::Corruption.
+  // * On success, stores the result in '*msg' and returns OK.
+  Status ReadNextPB(google::protobuf::Message* msg);
+
+  // Dumps any unread protobuf messages in the container to 'os'. Each
+  // message's DebugString() method is invoked to produce its textual form.
+  // File must be open.
+  //
+  // If 'oneline' is true, prints each message on a single line.
+  Status Dump(std::ostream* os, bool oneline);
+
+  // Closes the container.
+  Status Close();
+
+  // Expected PB type and schema for each message to be read.
+  //
+  // Only valid after a successful call to Open().
+  const std::string& pb_type() const { return pb_type_; }
+  const google::protobuf::FileDescriptorSet* protos() const {
+    return protos_.get();
+  }
+
+  // Return the protobuf container file format version.
+  // File must be open.
+  int version() const;
+
+  // Return current read offset.
+  // File must be open.
+  uint64_t offset() const;
+
+ private:
+  FileState state_;
+  int version_;
+  uint64_t offset_;
+
+  // The fully-qualified PB type name of the messages in the container.
+  std::string pb_type_;
+
+  // Wrapped in a unique_ptr so that clients need not include PB headers.
+  std::unique_ptr<google::protobuf::FileDescriptorSet> protos_;
+
+  std::shared_ptr<RandomAccessFile> reader_;
+};
+
+// Convenience functions for protobuf containers holding just one record.
+
+// Load a "containerized" protobuf from the given path.
+// If the file does not exist, returns Status::NotFound(). Otherwise, may
+// return other Status error codes such as Status::IOError.
+Status ReadPBContainerFromPath(Env* env, const std::string& path,
+                               google::protobuf::Message* msg);
+
+// Serialize a "containerized" protobuf to the given path.
+//
+// If create == NO_OVERWRITE and 'path' already exists, the function will fail.
+// If sync == SYNC, the newly created file will be fsynced before returning.
+Status WritePBContainerToPath(Env* env, const std::string& path,
+                              const google::protobuf::Message& msg,
+                              CreateMode create,
+                              SyncMode sync);
+
+// Wrapper for a protobuf message which lazily converts to JSON when
+// the trace buffer is dumped.
+//
+// When tracing, an instance of this class can be associated with
+// a given trace, instead of a stringified PB, thus avoiding doing
+// stringification inline and moving that work to the tracing process.
+//
+// Example usage:
+//  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+//                         "response", pb_util::PbTracer::TracePb(*response_pb_),
+//                         ...);
+//
+class PbTracer : public debug::ConvertableToTraceFormat {
+ public:
+  enum {
+    kMaxFieldLengthToTrace = 100
+  };
+
+  // Static helper to be called when adding a stringified PB to a trace.
+  // This does not actually stringify 'msg', that will be done later
+  // when/if AppendAsTraceFormat() is called on the returned object.
+  static scoped_refptr<debug::ConvertableToTraceFormat> TracePb(
+      const google::protobuf::Message& msg);
+
+  explicit PbTracer(const google::protobuf::Message& msg);
+
+  // Actually stringifies the PB and appends the string to 'out'.
+  void AppendAsTraceFormat(std::string* out) const override;
+ private:
+  const std::unique_ptr<google::protobuf::Message> msg_;
+};
+
+} // namespace pb_util
+
+// TODO(todd) Replacing all Message::ToString call sites for KUDU-1812
+// is much easier if these are available in the 'kudu' namespace. We should
+// consider removing these imports and move them to all call sites.
+using pb_util::SecureDebugString; // NOLINT
+using pb_util::SecureShortDebugString; // NOLINT
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.proto b/be/src/kudu/util/pb_util.proto
new file mode 100644
index 0000000..b78c0cf
--- /dev/null
+++ b/be/src/kudu/util/pb_util.proto
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+import "google/protobuf/descriptor.proto";
+
+// ============================================================================
+//  Protobuf container metadata
+// ============================================================================
+
+// Supplemental protobuf container header, after the main header (see
+// pb_util.h for details).
+message ContainerSupHeaderPB {
+  // The protobuf schema for the messages expected in this container.
+  //
+  // This schema is complete, that is, it includes all of its dependencies
+  // (i.e. other schemas defined in .proto files imported by this schema's
+  // .proto file).
+  required google.protobuf.FileDescriptorSet protos = 1;
+
+  // The PB message type expected in each data entry in this container. Must
+  // be fully qualified (i.e. kudu.tablet.TabletSuperBlockPB).
+  required string pb_type = 2;
+}
+
+extend google.protobuf.FieldOptions {
+  optional bool REDACT = 50001 [default=false];
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util_test.proto b/be/src/kudu/util/pb_util_test.proto
new file mode 100644
index 0000000..bac0be0
--- /dev/null
+++ b/be/src/kudu/util/pb_util_test.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+import "kudu/util/pb_util.proto";
+
+message TestSecurePrintingPB {
+  optional string insecure1 = 1;
+  optional string secure1 = 2 [(kudu.REDACT) = true];
+  optional string insecure2 = 3;
+  optional string secure2 = 4 [(kudu.REDACT) = true];
+  repeated string repeated_secure = 5 [(kudu.REDACT) = true];
+  optional string insecure3 = 6;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/process_memory-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory-test.cc b/be/src/kudu/util/process_memory-test.cc
new file mode 100644
index 0000000..38922f5
--- /dev/null
+++ b/be/src/kudu/util/process_memory-test.cc
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <thread>
+#include <vector>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/test_util.h"
+
+using std::atomic;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+// Microbenchmark for our new/delete hooks which track process-wide
+// memory consumption.
+TEST(ProcessMemory, BenchmarkConsumptionTracking) {
+  const int kNumThreads = 200;
+  vector<thread> threads;
+  atomic<bool> done(false);
+  atomic<int64_t> total_count(0);
+
+  // We start many threads, each of which performs 10:1 ratio of
+  // new/delete pairs to consumption lookups. The high number
+  // of threads highlights when there is contention on central
+  // tcmalloc locks.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&]() {
+        int64_t local_count = 0;
+        while (!done) {
+          for (int a = 0; a < 10; a++) {
+            // Mark 'x' volatile so that the compiler does not optimize out the
+            // allocation.
+            char* volatile x = new char[8000];
+            delete[] x;
+          }
+          process_memory::CurrentConsumption();
+          local_count++;
+        }
+        total_count += local_count;
+      });
+  }
+  double secs = 3;
+  SleepFor(MonoDelta::FromSeconds(secs));
+  done = true;
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  LOG(INFO) << "Performed " << total_count / secs << " iters/sec";
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/process_memory.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory.cc b/be/src/kudu/util/process_memory.cc
new file mode 100644
index 0000000..986e15a
--- /dev/null
+++ b/be/src/kudu/util/process_memory.cc
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <sys/resource.h>
+
+#include <gflags/gflags.h>
+#include <gperftools/malloc_extension.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random.h"
+#include "kudu/util/striped64.h"
+
+DEFINE_int64(memory_limit_hard_bytes, 0,
+             "Maximum amount of memory this daemon should use, in bytes. "
+             "A value of 0 autosizes based on the total system memory. "
+             "A value of -1 disables all memory limiting.");
+TAG_FLAG(memory_limit_hard_bytes, stable);
+
+DEFINE_int32(memory_pressure_percentage, 60,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before flushing of in-memory data becomes prioritized.");
+TAG_FLAG(memory_pressure_percentage, advanced);
+
+DEFINE_int32(memory_limit_soft_percentage, 80,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before memory throttling of writes begins. The greater "
+             "the excess, the higher the chance of throttling. In general, a "
+             "lower soft limit leads to smoother write latencies but "
+             "decreased throughput, and vice versa for a higher soft limit.");
+TAG_FLAG(memory_limit_soft_percentage, advanced);
+
+DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before WARNING level messages are periodically logged.");
+TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
+
+#ifdef TCMALLOC_ENABLED
+DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
+             "Maximum percentage of the RSS that tcmalloc is allowed to use for "
+             "reserved but unallocated memory.");
+TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
+#endif
+
+using strings::Substitute;
+
+namespace kudu {
+namespace process_memory {
+
+namespace {
+int64_t g_hard_limit;
+int64_t g_soft_limit;
+int64_t g_pressure_threshold;
+
+ThreadSafeRandom* g_rand = nullptr;
+
+#ifdef TCMALLOC_ENABLED
+// Total amount of memory released since the last GC. If this
+// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
+Atomic64 g_released_memory_since_gc;
+
+// Size, in bytes, that is considered a large value for Release() (or Consume() with
+// a negative value). If tcmalloc is used, this can trigger it to GC.
+// A higher value will make us call into tcmalloc less often (and therefore more
+// efficient). A lower value will mean our memory overhead is lower.
+// TODO(todd): this is a stopgap.
+const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L;
+
+#endif // TCMALLOC_ENABLED
+
+} // anonymous namespace
+
+
+// Flag validation
+// ------------------------------------------------------------
+// Validate that various flags are percentages.
+static bool ValidatePercentage(const char* flagname, int value) {
+  if (value >= 0 && value <= 100) {
+    return true;
+  }
+  LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
+                           flagname, value);
+  return false;
+}
+
+static bool dummy[] = {
+  google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage),
+  google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage)
+#ifdef TCMALLOC_ENABLED
+  ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage)
+#endif
+};
+
+
+// Wrappers around tcmalloc functionality
+// ------------------------------------------------------------
+#ifdef TCMALLOC_ENABLED
+static int64_t GetTCMallocProperty(const char* prop) {
+  size_t value;
+  if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
+    LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
+  }
+  return value;
+}
+
+int64_t GetTCMallocCurrentAllocatedBytes() {
+  return GetTCMallocProperty("generic.current_allocated_bytes");
+}
+
+void GcTcmalloc() {
+  TRACE_EVENT0("process", "GcTcmalloc");
+
+  // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
+  // not in use).
+  int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
+  // Bytes allocated by the application.
+  int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
+
+  int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
+  if (bytes_overhead > max_overhead) {
+    int64_t extra = bytes_overhead - max_overhead;
+    while (extra > 0) {
+      // Release 1MB at a time, so that tcmalloc releases its page heap lock
+      // allowing other threads to make progress. This still disrupts the current
+      // thread, but is better than disrupting all.
+      MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
+      extra -= 1024 * 1024;
+    }
+  }
+}
+#endif // TCMALLOC_ENABLED
+
+
+// Consumption and soft memory limit behavior
+// ------------------------------------------------------------
+namespace {
+void DoInitLimits() {
+  int64_t limit = FLAGS_memory_limit_hard_bytes;
+  if (limit == 0) {
+    // If no limit is provided, we'll use 80% of system RAM.
+    int64_t total_ram;
+    CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
+    limit = total_ram * 4;
+    limit /= 5;
+  }
+  g_hard_limit = limit;
+  g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100;
+  g_pressure_threshold = FLAGS_memory_pressure_percentage * g_hard_limit / 100;
+
+  g_rand = new ThreadSafeRandom(1);
+
+  LOG(INFO) << StringPrintf("Process hard memory limit is %.6f GB",
+                            (static_cast<float>(g_hard_limit) / (1024.0 * 1024.0 * 1024.0)));
+  LOG(INFO) << StringPrintf("Process soft memory limit is %.6f GB",
+                            (static_cast<float>(g_soft_limit) /
+                             (1024.0 * 1024.0 * 1024.0)));
+  LOG(INFO) << StringPrintf("Process memory pressure threshold is %.6f GB",
+                            (static_cast<float>(g_pressure_threshold) /
+                             (1024.0 * 1024.0 * 1024.0)));
+}
+
+void InitLimits() {
+  static GoogleOnceType once;
+  GoogleOnceInit(&once, &DoInitLimits);
+}
+
+} // anonymous namespace
+
+int64_t CurrentConsumption() {
+#ifdef TCMALLOC_ENABLED
+  const int64_t kReadIntervalMicros = 50000;
+  static Atomic64 last_read_time = 0;
+  static simple_spinlock read_lock;
+  static Atomic64 consumption = 0;
+  uint64_t time = GetMonoTimeMicros();
+  if (time > last_read_time + kReadIntervalMicros && read_lock.try_lock()) {
+    base::subtle::NoBarrier_Store(&consumption, GetTCMallocCurrentAllocatedBytes());
+    // Re-fetch the time after getting the consumption. This way, in case fetching
+    // consumption is extremely slow for some reason (eg due to lots of contention
+    // in tcmalloc) we at least ensure that we wait at least another full interval
+    // before fetching the information again.
+    time = GetMonoTimeMicros();
+    base::subtle::NoBarrier_Store(&last_read_time, time);
+    read_lock.unlock();
+  }
+
+  return base::subtle::NoBarrier_Load(&consumption);
+#else
+  // Without tcmalloc, we have no reliable way of determining our own heap
+  // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back
+  // to just looking at the sum of our tracked memory.
+  return MemTracker::GetRootTracker()->consumption();
+#endif
+}
+
+int64_t HardLimit() {
+  return g_hard_limit;
+}
+
+bool UnderMemoryPressure(double* current_capacity_pct) {
+  InitLimits();
+  int64_t consumption = CurrentConsumption();
+  if (consumption < g_pressure_threshold) {
+    return false;
+  }
+  if (current_capacity_pct) {
+    *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+  }
+  return true;
+}
+
+bool SoftLimitExceeded(double* current_capacity_pct) {
+  InitLimits();
+  int64_t consumption = CurrentConsumption();
+  // Did we exceed the actual limit?
+  if (consumption > g_hard_limit) {
+    if (current_capacity_pct) {
+      *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+    }
+    return true;
+  }
+
+  // No soft limit defined.
+  if (g_hard_limit == g_soft_limit) {
+    return false;
+  }
+
+  // Are we under the soft limit threshold?
+  if (consumption < g_soft_limit) {
+    return false;
+  }
+
+  // We're over the threshold; were we randomly chosen to be over the soft limit?
+  if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) {
+    if (current_capacity_pct) {
+      *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+    }
+    return true;
+  }
+  return false;
+}
+
+void MaybeGCAfterRelease(int64_t released_bytes) {
+#ifdef TCMALLOC_ENABLED
+  int64_t now_released = base::subtle::NoBarrier_AtomicIncrement(
+      &g_released_memory_since_gc, -released_bytes);
+  if (PREDICT_FALSE(now_released > GC_RELEASE_SIZE)) {
+    base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0);
+    GcTcmalloc();
+  }
+#endif
+}
+
+} // namespace process_memory
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/process_memory.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory.h b/be/src/kudu/util/process_memory.h
new file mode 100644
index 0000000..1545dc7
--- /dev/null
+++ b/be/src/kudu/util/process_memory.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+
+namespace kudu {
+namespace process_memory {
+
+// Probabilistically returns true if the process-wide soft memory limit is exceeded.
+// The greater the excess, the higher the chance that it returns true.
+//
+// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage
+// of the hard limit consumed is written to it.
+bool SoftLimitExceeded(double* current_capacity_pct);
+
+// Return true if we are under memory pressure (i.e if we are nearing the point at which
+// SoftLimitExceeded will begin to return true).
+//
+// If the process is under memory pressure, and 'current_capacity_pct' is not NULL,
+// the percentage of the hard limit consumed is written to it.
+bool UnderMemoryPressure(double* current_capacity_pct);
+
+// Potentially trigger a call to release tcmalloc memory back to the
+// OS, after the given amount of memory was released.
+void MaybeGCAfterRelease(int64_t released_bytes);
+
+// Return the total current memory consumption of the process.
+int64_t CurrentConsumption();
+
+// Return the configured hard limit for the process.
+int64_t HardLimit();
+
+#ifdef TCMALLOC_ENABLED
+// Get the current amount of allocated memory, according to tcmalloc.
+//
+// This should be equal to CurrentConsumption(), but is made available so that tests
+// can verify the correctness of CurrentConsumption().
+int64_t GetTCMallocCurrentAllocatedBytes();
+#endif
+
+} // namespace process_memory
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/promise.h b/be/src/kudu/util/promise.h
new file mode 100644
index 0000000..17f8cec
--- /dev/null
+++ b/be/src/kudu/util/promise.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PROMISE_H
+#define KUDU_UTIL_PROMISE_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/countdown_latch.h"
+
+namespace kudu {
+
+// A promise boxes a value which is to be provided at some time in the future.
+// A single producer calls Set(...), and any number of consumers can call Get()
+// to retrieve the produced value.
+//
+// In Guava terms, this is a SettableFuture<T>.
+template<typename T>
+class Promise {
+ public:
+  Promise() : latch_(1) {}
+  ~Promise() {}
+
+  // Reset the promise to be used again.
+  // For this to be safe, there must be some kind of external synchronization
+  // ensuring that no threads are still accessing the value from the previous
+  // incarnation of the promise.
+  void Reset() {
+    latch_.Reset(1);
+    val_ = T();
+  }
+
+  // Block until a value is available, and return a reference to it.
+  const T& Get() const {
+    latch_.Wait();
+    return val_;
+  }
+
+  // Wait for the promised value to become available with the given timeout.
+  //
+  // Returns NULL if the timeout elapses before a value is available.
+  // Otherwise returns a pointer to the value. This pointer's lifetime is
+  // tied to the lifetime of the Promise object.
+  const T* WaitFor(const MonoDelta& delta) const {
+    if (latch_.WaitFor(delta)) {
+      return &val_;
+    } else {
+      return NULL;
+    }
+  }
+
+  // Set the value of this promise.
+  // This may be called at most once.
+  void Set(const T& val) {
+    DCHECK_EQ(latch_.count(), 1) << "Already set!";
+    val_ = val;
+    latch_.CountDown();
+  }
+
+ private:
+  CountDownLatch latch_;
+  T val_;
+  DISALLOW_COPY_AND_ASSIGN(Promise);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_PROMISE_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/proto_container_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test.proto b/be/src/kudu/util/proto_container_test.proto
new file mode 100644
index 0000000..4707c08
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test.proto
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Arbitrary protobuf to test writing a containerized protobuf.
+message ProtoContainerTestPB {
+  required string name = 1;
+  required int32 value = 2;
+  optional string note = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/proto_container_test2.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test2.proto b/be/src/kudu/util/proto_container_test2.proto
new file mode 100644
index 0000000..74a1ea3
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test2.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Dependency chain:
+//
+// this file --> proto_container_test.proto
+
+import "kudu/util/proto_container_test.proto";
+
+// Arbitrary protobuf that has one PB dependency.
+message ProtoContainerTest2PB {
+  required kudu.ProtoContainerTestPB record = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/proto_container_test3.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test3.proto b/be/src/kudu/util/proto_container_test3.proto
new file mode 100644
index 0000000..1ed1c31
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test3.proto
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Dependency chain:
+//
+// this file --> proto_container_test.proto
+//           --> proto_container_test2.proto --> proto_container_test.proto
+
+import "kudu/util/proto_container_test.proto";
+import "kudu/util/proto_container_test2.proto";
+
+// Arbitrary protobuf has two PB dependencies.
+// dependency.
+message ProtoContainerTest3PB {
+  required kudu.ProtoContainerTestPB record_one = 1;
+  required kudu.ProtoContainerTest2PB record_two = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/protobuf-annotations.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protobuf-annotations.h b/be/src/kudu/util/protobuf-annotations.h
new file mode 100644
index 0000000..7fdc961
--- /dev/null
+++ b/be/src/kudu/util/protobuf-annotations.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple header which is inserted into all of our generated protobuf code.
+// We use this to hook protobuf code up to TSAN annotations.
+#ifndef KUDU_UTIL_PROTOBUF_ANNOTATIONS_H
+#define KUDU_UTIL_PROTOBUF_ANNOTATIONS_H
+
+#include "kudu/gutil/dynamic_annotations.h"
+
+// The protobuf internal headers are included before this, so we have to undefine
+// the empty definitions first.
+#undef GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN
+#undef GOOGLE_SAFE_CONCURRENT_WRITES_END
+
+#define GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN ANNOTATE_IGNORE_WRITES_BEGIN
+#define GOOGLE_SAFE_CONCURRENT_WRITES_END ANNOTATE_IGNORE_WRITES_END
+
+#endif /* KUDU_UTIL_PROTOBUF_ANNOTATIONS_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/protobuf_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protobuf_util.h b/be/src/kudu/util/protobuf_util.h
new file mode 100644
index 0000000..cc88eda
--- /dev/null
+++ b/be/src/kudu/util/protobuf_util.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PROTOBUF_UTIL_H
+#define KUDU_UTIL_PROTOBUF_UTIL_H
+
+#include <google/protobuf/message_lite.h>
+
+namespace kudu {
+
+bool AppendPBToString(const google::protobuf::MessageLite &msg, faststring *output) {
+  int old_size = output->size();
+  int byte_size = msg.ByteSize();
+  output->resize(old_size + byte_size);
+  uint8* start = reinterpret_cast<uint8*>(output->data() + old_size);
+  uint8* end = msg.SerializeWithCachedSizesToArray(start);
+  CHECK(end - start == byte_size)
+    << "Error in serialization. byte_size=" << byte_size
+    << " new ByteSize()=" << msg.ByteSize()
+    << " end-start=" << (end-start);
+  return true;
+}
+
+} // namespace kudu
+
+#endif



Mime
View raw message