kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [4/4] kudu git commit: KUDU-463. Add checksumming to cfile
Date Wed, 10 May 2017 18:45:16 GMT
KUDU-463. Add checksumming to cfile

Adds optional checksumming and validation to cfile blocks.

Introduces 2 flags to control cfile checksumming:
- cfile_write_checksums (default false)
- cfile_verify_checksums (default true)

cfile_write_checksums is used in the CFileWriter to enable computing and
appending Crc32 checksums to the end of each cfile block, header and footer

cfile_write_checksums is defaulted to false to ensure upgrades don't
immediately result in performance degredation and incompatible data on
downgrade. It can and likely should be defaulted to true in a later release.

When cfile_write_checksums is set to true, the existing incompatible_features
bitset in the cfile footer is used. A "checksum" bit is set to ensure a clear
error message when older versions of Kudu try to read the file. If checksums
are not written the incompatible_features "checksum" bit is not set.

cfile_verify_checksums is used in the CFileReader to enable validating the
data against the written checksum. cfile_verify_checksums is defaulted to
true since validation only occurs if checksums are written. Any data that
was written before checksumming was an option or when cfile_write_checksums
was false will not be verified.

Change-Id: I6756834cd7f27af258797a3654a95244abeb0976
Reviewed-on: http://gerrit.cloudera.org:8080/6630
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b927e80a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b927e80a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b927e80a

Branch: refs/heads/master
Commit: b927e80a805a0ac078c22c49071e35962f7141d3
Parents: d3f7b38
Author: Grant Henke <granthenke@gmail.com>
Authored: Tue May 2 11:33:06 2017 -0500
Committer: Adar Dembo <adar@cloudera.com>
Committed: Wed May 10 18:43:42 2017 +0000

----------------------------------------------------------------------
 docs/design-docs/cfile.md      |  27 +++++-
 src/kudu/cfile/cfile-test.cc   |  96 +++++++++++++++++++++-
 src/kudu/cfile/cfile_reader.cc | 159 ++++++++++++++++++++++++++----------
 src/kudu/cfile/cfile_reader.h  |   5 +-
 src/kudu/cfile/cfile_util.h    |  10 +++
 src/kudu/cfile/cfile_writer.cc |  59 +++++++++++--
 src/kudu/cfile/cfile_writer.h  |   1 +
 src/kudu/util/crc-test.cc      |  14 +++-
 src/kudu/util/crc.cc           |   6 ++
 src/kudu/util/crc.h            |   4 +
 10 files changed, 325 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/docs/design-docs/cfile.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/cfile.md b/docs/design-docs/cfile.md
index 90a065d..7f4a0f4 100644
--- a/docs/design-docs/cfile.md
+++ b/docs/design-docs/cfile.md
@@ -28,16 +28,22 @@ Header
 ------
 
 <magic>: see below
-<header length>: 32-bit unsigned integer length delimiter
+<header length>: 32-bit unsigned integer length of the header protobuf
 <header>: CFileHeaderPB protobuf
+<checksum>: An optional Crc32 checksum of the magic, length, and protobuf
 
+Block
+-----
+<data>: see below
+<checksum>: An optional Crc32 checksum of the data
 
 Footer
 ------
 
+<checksum>: An optional Crc32 checksum of the protobuf, magic, and length
 <footer>: CFileFooterPB protobuf
 <magic>: see below
-<footer length> (length of protobuf)
+<footer length>: 32-bit unsigned integer length of the footer protobuf
 
 
 Magic strings
@@ -110,7 +116,7 @@ Each integer is relative to the min element in the header.
 
 ==============================
 
-Nullable Columns
+Nullable Columns:
 
 If a column is marked as nullable in the schema, a bitmap is used to keep track
 of the null and not null rows.
@@ -195,3 +201,18 @@ An index block is encoded similarly for both types of indexes:
 The trailer protobuf includes a field which designates whether the block
 is a leaf node or internal node of the B-Tree, allowing a reader to know
 whether the pointer is to another index block or to a data block.
+
+==============================
+
+Checksums:
+
+Checksums can be optionally written and verified.
+
+When checksums for the header, data, and footer are written in the CFile,
+the incompatible_features bitset in the CFile footer is used. A "checksum"
+bit is set to ensure the reader knows if checksums exist.
+
+When reading a CFile the footer should be read first to find if the
+file contains checksums. If the incompatible_features bitset indicates
+checksums exist, the reader can optionally validate them against the
+read data.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 7fa56f5..72a91fe 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -37,6 +37,8 @@
 
 DECLARE_string(block_cache_type);
 DECLARE_string(cfile_do_on_finish);
+DECLARE_bool(cfile_write_checksums);
+DECLARE_bool(cfile_verify_checksums);
 
 #if defined(__linux__)
 DECLARE_string(nvm_cache_path);
@@ -209,7 +211,6 @@ class TestCFile : public CFileTestBase {
     TimeSeekAndReadFileWithNulls(generator, block_id, n);
   }
 
-
   void TestReadWriteRawBlocks(CompressionType compression, int num_entries) {
     // Test Write
     unique_ptr<WritableBlock> sink;
@@ -219,6 +220,7 @@ class TestCFile : public CFileTestBase {
     opts.write_posidx = true;
     opts.write_validx = false;
     opts.storage_attributes.cfile_block_size = FLAGS_cfile_test_block_size;
+    opts.storage_attributes.compression = compression;
     opts.storage_attributes.encoding = PLAIN_ENCODING;
     CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
     ASSERT_OK(w.Start());
@@ -238,6 +240,13 @@ class TestCFile : public CFileTestBase {
     unique_ptr<CFileReader> reader;
     ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
 
+    ASSERT_EQ(reader->footer().compression(), compression);
+    if (FLAGS_cfile_write_checksums) {
+      ASSERT_TRUE(reader->footer().incompatible_features() & IncompatibleFeatures::CHECKSUM);
+    } else {
+      ASSERT_FALSE(reader->footer().incompatible_features() & IncompatibleFeatures::CHECKSUM);
+    }
+
     gscoped_ptr<IndexTreeIterator> iter;
     iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
@@ -311,6 +320,44 @@ class TestCFile : public CFileTestBase {
       LOG(INFO) << "End readfile";
     }
   }
+
+  Status CorruptAndReadBlock(const BlockId block_id, const uint64_t corrupt_offset,
+                             uint8_t flip_bit) {
+    // Read the input block
+    unique_ptr<ReadableBlock> source;
+    RETURN_NOT_OK(fs_manager_->OpenBlock(block_id, &source));
+    uint64_t file_size;
+    RETURN_NOT_OK(source->Size(&file_size));
+    uint8_t data_scratch[file_size];
+    Slice data(data_scratch, file_size);
+    RETURN_NOT_OK(source->Read(0, &data));
+
+    // Corrupt the data and write to a new block
+    uint8_t orig = data.data()[corrupt_offset];
+    uint8_t corrupt = orig ^ (static_cast<uint8_t>(1) << flip_bit);
+    data.mutable_data()[corrupt_offset] = corrupt;
+    unique_ptr<fs::WritableBlock> writer;
+    RETURN_NOT_OK(fs_manager_->CreateNewBlock(&writer));
+    RETURN_NOT_OK(writer->Append(data));
+    RETURN_NOT_OK(writer->Close());
+
+    // Open and read the corrupt block with the CFileReader
+    unique_ptr<ReadableBlock> corrupt_source;
+    RETURN_NOT_OK(fs_manager_->OpenBlock(writer->id(), &corrupt_source));
+    unique_ptr<CFileReader> reader;
+    RETURN_NOT_OK(CFileReader::Open(std::move(corrupt_source), ReaderOptions(), &reader));
+    gscoped_ptr<IndexTreeIterator> iter;
+    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    RETURN_NOT_OK(iter->SeekToFirst());
+
+    do {
+      BlockHandle dblk_data;
+      BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
+      RETURN_NOT_OK(reader->ReadBlock(blk_ptr, CFileReader::DONT_CACHE_BLOCK, &dblk_data));
+    } while (iter->Next().ok());
+
+    return Status::OK();
+  }
 };
 
 // Subclass of TestCFile which is parameterized on the block cache type.
@@ -759,6 +806,53 @@ TEST_P(TestCFileBothCacheTypes, TestAppendRaw) {
   TestReadWriteRawBlocks(ZLIB, 1000);
 }
 
+TEST_P(TestCFileBothCacheTypes, TestChecksumFlags) {
+  for (bool write_checksums : {false, true}) {
+    for (bool verify_checksums : {false, true}) {
+      FLAGS_cfile_write_checksums = write_checksums;
+      FLAGS_cfile_verify_checksums = verify_checksums;
+      TestReadWriteRawBlocks(NO_COMPRESSION, 1000);
+      TestReadWriteRawBlocks(SNAPPY, 1000);
+    }
+  }
+}
+
+TEST_P(TestCFileBothCacheTypes, TestDataCorruption) {
+  FLAGS_cfile_write_checksums = true;
+  FLAGS_cfile_verify_checksums = true;
+
+  // Write some data
+  unique_ptr<WritableBlock> sink;
+  ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+  BlockId id = sink->id();
+  WriterOptions opts;
+  opts.write_posidx = true;
+  opts.write_validx = false;
+  opts.storage_attributes.cfile_block_size = FLAGS_cfile_test_block_size;
+  opts.storage_attributes.encoding = PLAIN_ENCODING;
+  CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
+  w.AddMetadataPair("header_key", "header_value");
+  ASSERT_OK(w.Start());
+  vector<Slice> slices;
+  slices.push_back(Slice("HelloWorld"));
+  ASSERT_OK(w.AppendRawBlock(slices, 1, nullptr, Slice(), "raw-data"));
+  ASSERT_OK(w.Finish());
+
+  // Get the final size of the data
+  unique_ptr<ReadableBlock> source;
+  ASSERT_OK(fs_manager_->OpenBlock(id, &source));
+  uint64_t file_size;
+  ASSERT_OK(source->Size(&file_size));
+
+  // Corrupt each bit and verify a corruption status is returned
+  for (size_t i = 0; i < file_size; i++) {
+    for (uint8_t flip = 0; flip < 8; flip++) {
+      Status s = CorruptAndReadBlock(id, i, flip);
+      ASSERT_TRUE(s.IsCorruption());
+    }
+  }
+}
+
 TEST_P(TestCFileBothCacheTypes, TestNullInts) {
   UInt32DataGenerator<true> generator;
   TestNullTypes(&generator, PLAIN_ENCODING, NO_COMPRESSION);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index 5135dc2..bc00359 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -35,6 +35,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/compression/compression_codec.h"
+#include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/malloc.h"
@@ -50,6 +51,10 @@ DEFINE_bool(cfile_lazy_open, true,
             "Allow lazily opening of cfiles");
 TAG_FLAG(cfile_lazy_open, hidden);
 
+DEFINE_bool(cfile_verify_checksums, true,
+            "Verify the checksum for each block on read if one exists");
+TAG_FLAG(cfile_verify_checksums, evolving);
+
 using kudu::fs::ReadableBlock;
 using strings::Substitute;
 using std::unique_ptr;
@@ -129,30 +134,20 @@ Status CFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
   return Status::OK();
 }
 
-Status CFileReader::ReadMagicAndLength(uint64_t offset, uint32_t *len) {
-  TRACE_EVENT1("io", "CFileReader::ReadMagicAndLength",
-               "cfile", ToString());
-  uint8_t scratch[kMagicAndLengthSize];
-  Slice slice(scratch, kMagicAndLengthSize);
-
-  RETURN_NOT_OK(block_->Read(offset, &slice));
-
-  return ParseMagicAndLength(slice, &cfile_version_, len);
-}
-
 Status CFileReader::InitOnce() {
   VLOG(1) << "Initializing CFile with ID " << block_->id().ToString();
   TRACE_COUNTER_INCREMENT("cfile_init", 1);
 
-  RETURN_NOT_OK(ReadAndParseHeader());
-
+  // Parse Footer first to find unsupported features.
   RETURN_NOT_OK(ReadAndParseFooter());
 
-  if (PREDICT_FALSE(footer_->incompatible_features() != 0)) {
-    // Currently we do not support any incompatible features.
+  RETURN_NOT_OK(ReadAndParseHeader());
+
+  if (PREDICT_FALSE(footer_->incompatible_features() & ~IncompatibleFeatures::SUPPORTED))
{
     return Status::NotSupported(Substitute(
-        "cfile uses features from an incompatible version: $0",
-        footer_->incompatible_features()));
+        "cfile uses features from an incompatible bitset value $0 vs supported $1 ",
+        footer_->incompatible_features(),
+        IncompatibleFeatures::SUPPORTED));
   }
 
   type_info_ = GetTypeInfo(footer_->data_type());
@@ -184,15 +179,38 @@ Status CFileReader::ReadAndParseHeader() {
   // First read and parse the "pre-header", which lets us know
   // that it is indeed a CFile and tells us the length of the
   // proper protobuf header.
+  uint8_t mal_scratch[kMagicAndLengthSize];
+  Slice mal(mal_scratch, kMagicAndLengthSize);
+  RETURN_NOT_OK(block_->Read(0, &mal));
   uint32_t header_size;
-  RETURN_NOT_OK(ReadMagicAndLength(0, &header_size));
+  RETURN_NOT_OK(ParseMagicAndLength(mal, &cfile_version_, &header_size));
+
+  // Quick check to ensure the header size is reasonable.
+  if (header_size >= file_size_ - kMagicAndLengthSize) {
+    return Status::Corruption("invalid header size");
+  }
+
+  // Setup the data slices.
+  uint64_t off = kMagicAndLengthSize;
+  uint8_t header_scratch[header_size];
+  Slice header(header_scratch, header_size);
+  uint8_t checksum_scratch[kChecksumSize];
+  Slice checksum(checksum_scratch, kChecksumSize);
+
+  // Read the header and checksum if needed.
+  vector<Slice> results = { header };
+  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+    results.push_back(checksum);
+  }
+  RETURN_NOT_OK(block_->ReadV(off, &results));
 
-  // Now read the protobuf header.
-  uint8_t header_space[header_size];
-  Slice header_slice(header_space, header_size);
+  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+    RETURN_NOT_OK(VerifyChecksum({ mal, header }, checksum));
+  }
+
+  // Parse the protobuf header.
   header_.reset(new CFileHeaderPB());
-  RETURN_NOT_OK(block_->Read(kMagicAndLengthSize, &header_slice));
-  if (!header_->ParseFromArray(header_slice.data(), header_size)) {
+  if (!header_->ParseFromArray(header.data(), header.size())) {
     return Status::Corruption("Invalid cfile pb header");
   }
 
@@ -201,7 +219,6 @@ Status CFileReader::ReadAndParseHeader() {
   return Status::OK();
 }
 
-
 Status CFileReader::ReadAndParseFooter() {
   TRACE_EVENT1("io", "CFileReader::ReadAndParseFooter",
                "cfile", ToString());
@@ -210,22 +227,46 @@ Status CFileReader::ReadAndParseFooter() {
     "file too short: " << file_size_;
 
   // First read and parse the "post-footer", which has magic
-  // and the length of the actual protobuf footer
+  // and the length of the actual protobuf footer.
+  uint8_t mal_scratch[kMagicAndLengthSize];
+  Slice mal(mal_scratch, kMagicAndLengthSize);
+  RETURN_NOT_OK(block_->Read(file_size_ - kMagicAndLengthSize, &mal));
   uint32_t footer_size;
-  RETURN_NOT_OK_PREPEND(ReadMagicAndLength(file_size_ - kMagicAndLengthSize, &footer_size),
-                        "Failed to read magic and length from end of file");
+  RETURN_NOT_OK(ParseMagicAndLength(mal, &cfile_version_, &footer_size));
 
-  // Now read the protobuf footer.
+  // Quick check to ensure the footer size is reasonable.
+  if (footer_size >= file_size_ - kMagicAndLengthSize) {
+    return Status::Corruption("invalid footer size");
+  }
+
+  uint8_t footer_scratch[footer_size];
+  Slice footer(footer_scratch, footer_size);
+
+  uint8_t checksum_scratch[kChecksumSize];
+  Slice checksum(checksum_scratch, kChecksumSize);
+
+  // Read both the header and checksum in one call.
+  // We read the checksum position in case one exists.
+  // This is done to avoid the need for a follow up read call.
+  vector<Slice> results = { checksum, footer };
+  uint64_t off = file_size_ - kMagicAndLengthSize - footer_size - kChecksumSize;
+  RETURN_NOT_OK(block_->ReadV(off, &results));
+
+  // Parse the protobuf footer.
+  // This needs to be done before validating the checksum since the
+  // incompatible_features flag tells us if a checksum exists at all.
   footer_.reset(new CFileFooterPB());
-  uint8_t footer_space[footer_size];
-  Slice footer_slice(footer_space, footer_size);
-  uint64_t off = file_size_ - kMagicAndLengthSize - footer_size;
-  RETURN_NOT_OK(block_->Read(off, &footer_slice));
-  if (!footer_->ParseFromArray(footer_slice.data(), footer_size)) {
+  if (!footer_->ParseFromArray(footer.data(), footer.size())) {
     return Status::Corruption("Invalid cfile pb footer");
   }
 
-  // Verify if the compression codec is available
+  // Verify the footer checksum if needed.
+  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+    // If a checksum exists it was pre-read.
+    RETURN_NOT_OK(VerifyChecksum({ footer, mal }, checksum));
+  }
+
+  // Verify if the compression codec is available.
   if (footer_->compression() != NO_COMPRESSION) {
     RETURN_NOT_OK(GetCompressionCodec(footer_->compression(), &codec_));
   }
@@ -235,6 +276,24 @@ Status CFileReader::ReadAndParseFooter() {
   return Status::OK();
 }
 
+bool CFileReader::has_checksums() const {
+  return footer_->incompatible_features() & IncompatibleFeatures::CHECKSUM;
+}
+
+Status CFileReader::VerifyChecksum(const std::vector<Slice>& data, const Slice&
checksum) const {
+  uint32_t expected_checksum = DecodeFixed32(checksum.data());
+  uint32_t checksum_value = 0;
+  for (auto& d : data) {
+    checksum_value = crc::Crc32c(d.data(), d.size(), checksum_value);
+  }
+  if (PREDICT_FALSE(checksum_value != expected_checksum)) {
+    return Status::Corruption(
+        Substitute("Checksum does not match: $0 vs expected $1",
+                   checksum_value, expected_checksum));
+  }
+  return Status::OK();
+}
+
 namespace {
 
 // ScratchMemory acts as a holder for the destination buffer for a block read.
@@ -351,21 +410,37 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl
cache_contro
   TRACE_COUNTER_INCREMENT("cfile_cache_miss", 1);
   TRACE_COUNTER_INCREMENT(CFILE_CACHE_MISS_BYTES_METRIC_NAME, ptr.size());
 
+  uint32_t data_size = ptr.size();
+  if (has_checksums()) {
+    if (kChecksumSize > data_size) {
+      return Status::Corruption("invalid data size");
+    }
+    data_size -= kChecksumSize;
+  }
+
   ScratchMemory scratch;
   // If we are reading uncompressed data and plan to cache the result,
   // then we should allocate our scratch memory directly from the cache.
   // This avoids an extra memory copy in the case of an NVM cache.
   if (codec_ == nullptr && cache_control == CACHE_BLOCK) {
-    scratch.TryAllocateFromCache(cache, key, ptr.size());
+    scratch.TryAllocateFromCache(cache, key, data_size);
   } else {
-    scratch.AllocateFromHeap(ptr.size());
+    scratch.AllocateFromHeap(data_size);
   }
   uint8_t* buf = scratch.get();
+  Slice block(buf, data_size);
+  uint8_t checksum_scratch[kChecksumSize];
+  Slice checksum(checksum_scratch, kChecksumSize);
+
+  // Read the data and checksum if needed.
+  vector<Slice> results = { block };
+  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+    results.push_back(checksum);
+  }
+  RETURN_NOT_OK(block_->ReadV(ptr.offset(), &results));
 
-  Slice block(buf, ptr.size());
-  RETURN_NOT_OK(block_->Read(ptr.offset(), &block));
-  if (block.size() != ptr.size()) {
-    return Status::IOError("Could not read full block length");
+  if (has_checksums() && FLAGS_cfile_verify_checksums) {
+    RETURN_NOT_OK(VerifyChecksum({ block }, checksum));
   }
 
   // Decompress the block
@@ -375,7 +450,7 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl
cache_contro
     Status s = uncompressor.Init();
     if (!s.ok()) {
       LOG(WARNING) << "Unable to validate compressed block at "
-                   << ptr.offset() << " of size " << ptr.size() <<
": "
+                   << ptr.offset() << " of size " << block.size() <<
": "
                    << s.ToString();
       return s;
     }
@@ -392,7 +467,7 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl
cache_contro
     s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get());
     if (!s.ok()) {
       LOG(WARNING) << "Unable to uncompress block at " << ptr.offset()
-                   << " of size " << ptr.size() << ": " << s.ToString();
+                   << " of size " <<  block.size() << ": " << s.ToString();
       return s;
     }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/cfile/cfile_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index a1b9956..f7a108c 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -168,6 +168,9 @@ class CFileReader {
     return BlockPointer(footer().validx_info().root_block());
   }
 
+  // Returns true if the file has checksums on the header, footer, and data blocks.
+  bool has_checksums() const;
+
   // Can be called before Init().
   std::string ToString() const { return block_->id().ToString(); }
 
@@ -181,9 +184,9 @@ class CFileReader {
   // Callback used in 'init_once_' to initialize this cfile.
   Status InitOnce();
 
-  Status ReadMagicAndLength(uint64_t offset, uint32_t *len);
   Status ReadAndParseHeader();
   Status ReadAndParseFooter();
+  Status VerifyChecksum(const std::vector<Slice>& data, const Slice& checksum)
const;
 
   // Returns the memory usage of the object including the object itself.
   size_t memory_footprint() const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/cfile/cfile_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_util.h b/src/kudu/cfile/cfile_util.h
index 54a456b..640ea9c 100644
--- a/src/kudu/cfile/cfile_util.h
+++ b/src/kudu/cfile/cfile_util.h
@@ -35,6 +35,16 @@ namespace cfile {
 class CFileReader;
 class CFileIterator;
 
+// Used to set the CFileFooterPB bitset tracking incompatible features
+enum IncompatibleFeatures {
+  NONE = 0,
+
+  // Write a crc32 checksum at the end of each cfile block
+  CHECKSUM = 1 << 0,
+
+  SUPPORTED = NONE | CHECKSUM
+};
+
 struct WriterOptions {
   // Approximate size of index blocks.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/cfile/cfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 5d5ad3c..a8819ff 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -31,6 +31,7 @@
 #include "kudu/gutil/endian.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/compression/compression_codec.h"
+#include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/hexdump.h"
@@ -62,6 +63,10 @@ DEFINE_string(cfile_do_on_finish, "flush",
               "Possible values are 'close', 'flush', or 'nothing'.");
 TAG_FLAG(cfile_do_on_finish, experimental);
 
+DEFINE_bool(cfile_write_checksums, false,
+            "Write CRC32 checksums for each block");
+TAG_FLAG(cfile_write_checksums, evolving);
+
 using google::protobuf::RepeatedPtrField;
 using kudu::fs::ScopedWritableBlockCloser;
 using kudu::fs::WritableBlock;
@@ -75,6 +80,7 @@ namespace cfile {
 const char kMagicStringV1[] = "kuducfil";
 const char kMagicStringV2[] = "kuducfl2";
 const int kMagicLength = 8;
+const size_t kChecksumSize = sizeof(uint32_t);
 
 static const size_t kMinBlockSize = 512;
 
@@ -167,14 +173,25 @@ Status CFileWriter::Start() {
 
   uint32_t pb_size = header.ByteSize();
 
-  faststring buf;
+  faststring header_str;
   // First the magic.
-  buf.append(kMagicStringV2);
+  header_str.append(kMagicStringV2);
   // Then Length-prefixed header.
-  PutFixed32(&buf, pb_size);
-  pb_util::AppendToString(header, &buf);
-  RETURN_NOT_OK_PREPEND(block_->Append(Slice(buf)), "Couldn't write header");
-  off_ += buf.size();
+  PutFixed32(&header_str, pb_size);
+  pb_util::AppendToString(header, &header_str);
+
+  vector<Slice> header_slices;
+  header_slices.push_back(Slice(header_str));
+
+  // Append header checksum.
+  uint8_t checksum_buf[kChecksumSize];
+  if (FLAGS_cfile_write_checksums) {
+    uint32_t header_checksum = crc::Crc32c(header_str.data(), header_str.size());
+    InlineEncodeFixed32(checksum_buf, header_checksum);
+    header_slices.push_back(Slice(checksum_buf, kChecksumSize));
+  }
+
+  RETURN_NOT_OK_PREPEND(WriteRawData(header_slices), "Couldn't write header");
 
   BlockBuilder *bb;
   RETURN_NOT_OK(type_encoding_info_->CreateBlockBuilder(&bb, &options_));
@@ -208,6 +225,11 @@ Status CFileWriter::FinishAndReleaseBlock(ScopedWritableBlockCloser*
closer) {
 
   state_ = kWriterFinished;
 
+  uint32_t incompatible_features = 0;
+  if (FLAGS_cfile_write_checksums) {
+    incompatible_features |= IncompatibleFeatures::CHECKSUM;
+  }
+
   // Start preparing the footer.
   CFileFooterPB footer;
   footer.set_data_type(typeinfo_->type());
@@ -215,6 +237,7 @@ Status CFileWriter::FinishAndReleaseBlock(ScopedWritableBlockCloser* closer)
{
   footer.set_encoding(type_encoding_info_->encoding_type());
   footer.set_num_values(value_count_);
   footer.set_compression(compression_);
+  footer.set_incompatible_features(incompatible_features);
 
   // Write out any pending positional index blocks.
   if (options_.write_posidx) {
@@ -243,7 +266,17 @@ Status CFileWriter::FinishAndReleaseBlock(ScopedWritableBlockCloser*
closer) {
   footer_str.append(kMagicStringV2);
   PutFixed32(&footer_str, footer.GetCachedSize());
 
-  RETURN_NOT_OK(block_->Append(footer_str));
+  // Prepend the footer checksum.
+  vector<Slice> footer_slices;
+  uint8_t checksum_buf[kChecksumSize];
+  if (FLAGS_cfile_write_checksums) {
+    uint32_t footer_checksum = crc::Crc32c(footer_str.data(), footer_str.size());
+    InlineEncodeFixed32(checksum_buf, footer_checksum);
+    footer_slices.push_back(Slice(checksum_buf, kChecksumSize));
+  }
+
+  footer_slices.push_back(Slice(footer_str));
+  RETURN_NOT_OK_PREPEND(WriteRawData(footer_slices), "Couldn't write footer");
 
   // Done with this block.
   if (FLAGS_cfile_do_on_finish == "flush") {
@@ -463,6 +496,17 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
     out_slices = data_slices;
   }
 
+  // Calculate and append a data checksum.
+  uint8_t checksum_buf[kChecksumSize];
+  if (FLAGS_cfile_write_checksums) {
+    uint32_t checksum = 0;
+    for (const Slice &data : out_slices) {
+      checksum = crc::Crc32c(data.data(), data.size(), checksum);
+    }
+    InlineEncodeFixed32(checksum_buf, checksum);
+    out_slices.push_back(Slice(checksum_buf, kChecksumSize));
+  }
+
   RETURN_NOT_OK(WriteRawData(out_slices));
 
   uint64_t total_size = off_ - start_offset;
@@ -473,7 +517,6 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
   return Status::OK();
 }
 
-
 Status CFileWriter::WriteRawData(const vector<Slice>& data) {
   size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
                                 [&](int sum, const Slice& curr) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/cfile/cfile_writer.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.h b/src/kudu/cfile/cfile_writer.h
index b2dfac9..5766684 100644
--- a/src/kudu/cfile/cfile_writer.h
+++ b/src/kudu/cfile/cfile_writer.h
@@ -56,6 +56,7 @@ class IndexTreeBuilder;
 extern const char kMagicStringV1[];
 extern const char kMagicStringV2[];
 extern const int kMagicLength;
+extern const size_t kChecksumSize;
 
 class NullBitmapBuilder {
  public:

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/util/crc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/crc-test.cc b/src/kudu/util/crc-test.cc
index 6da023f..2c6db4b 100644
--- a/src/kudu/util/crc-test.cc
+++ b/src/kudu/util/crc-test.cc
@@ -47,6 +47,8 @@ class CrcTest : public KuduTest {
 // Basic functionality test.
 TEST_F(CrcTest, TestCRC32C) {
   const string test_data("abcdefgh");
+  const uint64_t kExpectedCrc = 0xa9421b7; // Known value from crcutil usage test program.
+
   Crc* crc32c = GetCrc32cInstance();
   uint64_t data_crc = 0;
   crc32c->Compute(test_data.data(), test_data.length(), &data_crc);
@@ -55,7 +57,17 @@ TEST_F(CrcTest, TestCRC32C) {
   LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output <<
" (full 64 bits)";
   output = FastHex32ToBuffer(static_cast<uint32_t>(data_crc), buf);
   LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output <<
" (truncated 32 bits)";
-  ASSERT_EQ(0xa9421b7, data_crc); // Known value from crcutil usage test program.
+  ASSERT_EQ(kExpectedCrc, data_crc);
+
+  // Using helper
+  uint64_t data_crc2 = Crc32c(test_data.data(), test_data.length());
+  ASSERT_EQ(kExpectedCrc, data_crc2);
+
+  // Using multiple chunks
+  size_t half_length = test_data.length() / 2;
+  uint64_t data_crc3 = Crc32c(test_data.data(), half_length);
+  data_crc3 = Crc32c(test_data.data() + half_length, half_length, data_crc3);
+  ASSERT_EQ(kExpectedCrc, data_crc3);
 }
 
 // Simple benchmark of CRC32C throughput.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/util/crc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/crc.cc b/src/kudu/util/crc.cc
index 7be2709..1534b8d 100644
--- a/src/kudu/util/crc.cc
+++ b/src/kudu/util/crc.cc
@@ -46,5 +46,11 @@ uint32_t Crc32c(const void* data, size_t length) {
   return static_cast<uint32_t>(crc32); // Only uses lower 32 bits.
 }
 
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32) {
+  uint64_t crc_tmp = static_cast<uint64_t>(prev_crc32);
+  GetCrc32cInstance()->Compute(data, length, &crc_tmp);
+  return static_cast<uint32_t>(crc_tmp); // Only uses lower 32 bits.
+}
+
 } // namespace crc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b927e80a/src/kudu/util/crc.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/crc.h b/src/kudu/util/crc.h
index 3c2277a..a5db4ea 100644
--- a/src/kudu/util/crc.h
+++ b/src/kudu/util/crc.h
@@ -33,6 +33,10 @@ Crc* GetCrc32cInstance();
 // Helper function to simply calculate a CRC32C of the given data.
 uint32_t Crc32c(const void* data, size_t length);
 
+// Given CRC value of previous chunk of data,
+// extends it to new chunk and returns the result.
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32);
+
 } // namespace crc
 } // namespace kudu
 


Mime
View raw message