kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/5] kudu git commit: Add snapshot scans to fuzz-itest
Date Wed, 30 Nov 2016 17:07:43 GMT
Add snapshot scans to fuzz-itest

This adds a new operation to fuzz-itest: snapshot scans
at a timestamp.

When generating random operations, scans at a timestamp are
now in the mix. The generator records how timestamps will
progress with writes and is careful to make sure that scans
happen in the past. After the test case is generated, but
before it is run, the timestamps for the scans are deduplicated
and sorted to that we save the state immediately before those
(and only those) timestamps.

This would fail very often before the REINSERT patches but
passes with it.

Ran 500 loops of fuzz-itest in dist-test in ASAN with:
- KUDU_ALLOW_SLOW_TESTS=1
- --stress_cpu_threads=4

All tests passed. Result:
http://dist-test.cloudera.org//job?job_id=david.alves.1480096588.30319

Change-Id: I4d15129e83c5c9afa9b643e674c8a23e18a2fa0e
Reviewed-on: http://gerrit.cloudera.org:8080/4996
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/10f2ce20
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/10f2ce20
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/10f2ce20

Branch: refs/heads/master
Commit: 10f2ce208fd646c6d2db0f8277ab0342b52cd080
Parents: abab2ce
Author: David Alves <dralves@apache.org>
Authored: Mon Nov 7 21:13:27 2016 -0800
Committer: David Ribeiro Alves <dralves@apache.org>
Committed: Wed Nov 30 10:14:38 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/fuzz-itest.cc | 220 +++++++++++++++++++++-----
 src/kudu/tablet/key_value_test_schema.h  |  21 ++-
 src/kudu/tserver/tablet_service.cc       |  13 +-
 3 files changed, 206 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/10f2ce20/src/kudu/integration-tests/fuzz-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index f4a8c8d..00cd2ec 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -21,6 +21,7 @@
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 #include <gflags/gflags.h>
+#include <list>
 #include <string>
 #include <vector>
 
@@ -33,6 +34,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/integration-tests/mini_cluster.h"
+#include "kudu/server/logical_clock.h"
 #include "kudu/tablet/key_value_test_schema.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_peer.h"
@@ -44,13 +46,9 @@
 
 DEFINE_int32(keyspace_size, 2,  "number of distinct primary keys to test with");
 DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
 DECLARE_bool(use_hybrid_clock);
 
-using boost::optional;
-using std::string;
-using std::vector;
-using std::unique_ptr;
-
 // The type of operation in a sequence of operations generated by
 // the fuzz test.
 enum TestOpType {
@@ -66,6 +64,7 @@ enum TestOpType {
   TEST_MAJOR_COMPACT_DELTAS,
   TEST_COMPACT_TABLET,
   TEST_RESTART_TS,
+  TEST_SCAN_AT_TIMESTAMP,
   TEST_NUM_OP_TYPES // max value for enum
 };
 MAKE_ENUM_LIMITS(TestOpType, TEST_INSERT, TEST_NUM_OP_TYPES);
@@ -74,11 +73,10 @@ const char* kTableName = "table";
 
 namespace kudu {
 
+using boost::optional;
 using client::KuduClient;
 using client::KuduClientBuilder;
-using client::KuduColumnSchema;
 using client::KuduDelete;
-using client::KuduInsert;
 using client::KuduPredicate;
 using client::KuduScanBatch;
 using client::KuduScanner;
@@ -92,6 +90,12 @@ using client::KuduUpsert;
 using client::KuduValue;
 using client::KuduWriteOperation;
 using client::sp::shared_ptr;
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+using std::unique_ptr;
+using strings::Substitute;
 
 namespace tablet {
 
@@ -107,7 +111,8 @@ const char* TestOpType_names[] = {
   "TEST_MINOR_COMPACT_DELTAS",
   "TEST_MAJOR_COMPACT_DELTAS",
   "TEST_COMPACT_TABLET",
-  "TEST_RESTART_TS"
+  "TEST_RESTART_TS",
+  "TEST_SCAN_AT_TIMESTAMP"
 };
 
 // An operation in a fuzz-test sequence.
@@ -115,11 +120,13 @@ struct TestOp {
   // The op to run.
   TestOpType type;
 
-  // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified. Otherwise, unused.
-  int row_key;
+  // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
+  // For SCAN_AT_TIMESTAMP the timestamp of the scan.
+  // Otherwise, unused.
+  int val;
 
   string ToString() const {
-    return strings::Substitute("{$0, $1}", TestOpType_names[type], row_key);
+    return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
   }
 };
 
@@ -134,6 +141,7 @@ class FuzzTest : public KuduTest {
   FuzzTest() {
     FLAGS_enable_maintenance_manager = false;
     FLAGS_use_hybrid_clock = false;
+    FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps = true;
 
     schema_ = client::KuduSchemaFromSchema(CreateKeyValueTestSchema());
   }
@@ -277,8 +285,98 @@ class FuzzTest : public KuduTest {
     return boost::none;
   }
 
+  // Checks that the rows in 'found' match the state we've stored 'saved_values_' corresponding
+  // to 'timestamp'. 'errors' collects the errors found. If 'errors' is not empty it means
the
+  // check failed.
+  void CheckRowsMatchAtTimestamp(int timestamp,
+                                 vector<ExpectedKeyValueRow> rows_found,
+                                 list<string>* errors) {
+    int saved_timestamp = -1;
+    auto iter = saved_values_.upper_bound(timestamp);
+    if (iter == saved_values_.end()) {
+      if (!rows_found.empty()) {
+        for (auto& found_row : rows_found) {
+          errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString()));
+        }
+      }
+    } else {
+      saved_timestamp = iter->first;
+      const auto& saved = (*iter).second;
+      int found_idx = 0;
+      int expected_values_counter = 0;
+      for (auto& expected : saved) {
+        if (expected) {
+          expected_values_counter++;
+          ExpectedKeyValueRow expected_val = expected.value();
+          if (found_idx >= rows_found.size()) {
+            errors->push_back(Substitute("Didn't find expected value: $0",
+                                         expected_val.ToString()));
+            break;
+          }
+          ExpectedKeyValueRow found_val = rows_found[found_idx++];
+          if (expected_val.key != found_val.key) {
+            errors->push_back(Substitute("Mismached key. Expected: $0 Found: $1",
+                                         expected_val.ToString(), found_val.ToString()));
+            continue;
+          }
+          if (expected_val.val != found_val.val) {
+            errors->push_back(Substitute("Mismached value. Expected: $0 Found: $1",
+                                         expected_val.ToString(), found_val.ToString()));
+            continue;
+          }
+        }
+      }
+      if (rows_found.size() != expected_values_counter) {
+        errors->push_back(Substitute("Mismatched size. Expected: $0 rows. Found: $1 rows.",
+                                     expected_values_counter, rows_found.size()));
+        for (auto& found_row : rows_found) {
+          errors->push_back(Substitute("Found unexpected row: $0", found_row.ToString()));
+        }
+      }
+    }
+    if (!errors->empty()) {
+      errors->push_front(Substitute("Found errors while comparing a snapshot scan at $0
with the "
+                                    "values saved at $1. Errors:",
+                                    timestamp, saved_timestamp));
+    }
+  }
+
+  // Scan the tablet at 'timestamp' and compare the result to the saved values.
+  void CheckScanAtTimestamp(int timestamp) {
+    KuduScanner s(table_.get());
+    ASSERT_OK(s.SetReadMode(KuduScanner::ReadMode::READ_AT_SNAPSHOT));
+    ASSERT_OK(s.SetSnapshotRaw(timestamp));
+    ASSERT_OK(s.SetOrderMode(KuduScanner::OrderMode::ORDERED));
+    ASSERT_OK(s.Open());
+    vector<ExpectedKeyValueRow> found;
+    while (s.HasMoreRows()) {
+      KuduScanBatch batch;
+      ASSERT_OK(s.NextBatch(&batch));
+      for (KuduScanBatch::RowPtr row : batch) {
+        ExpectedKeyValueRow ret;
+        ASSERT_OK(row.GetInt32(0, &ret.key));
+        if (!row.IsNull(1)) {
+          ret.val = 0;
+          ASSERT_OK(row.GetInt32(1, ret.val.get_ptr()));
+        }
+        found.push_back(ret);
+      }
+    }
+
+    list<string> errors;
+    CheckRowsMatchAtTimestamp(timestamp, std::move(found), &errors);
+
+    string final_error;
+    if (!errors.empty()) {
+      for (const string& error : errors) {
+        final_error.append("\n" + error);
+      }
+      FAIL() << final_error;
+    }
+  }
+
  protected:
-  void RunFuzzCase(const vector<TestOp>& ops,
+  void RunFuzzCase(const vector<TestOp>& test_ops,
                    int update_multiplier);
 
   KuduSchema schema_;
@@ -287,6 +385,10 @@ class FuzzTest : public KuduTest {
   shared_ptr<KuduSession> session_;
   shared_ptr<KuduTable> table_;
 
+  map<int,
+      vector<optional<ExpectedKeyValueRow>>,
+      std::greater<int>> saved_values_;
+
   scoped_refptr<TabletPeer> tablet_peer_;
 };
 
@@ -294,6 +396,7 @@ class FuzzTest : public KuduTest {
 // fuzz test.
 void GenerateTestCase(vector<TestOp>* ops, int len) {
   vector<bool> exists(FLAGS_keyspace_size);
+  int op_timestamps = 0;
   bool ops_pending = false;
   bool data_in_mrs = false;
   bool worth_compacting = false;
@@ -346,6 +449,7 @@ void GenerateTestCase(vector<TestOp>* ops, int len) {
         if (ops_pending) {
           ops->push_back({TEST_FLUSH_OPS, 0});
           ops_pending = false;
+          op_timestamps++;
         }
         break;
       case TEST_FLUSH_TABLET:
@@ -388,6 +492,14 @@ void GenerateTestCase(vector<TestOp>* ops, int len) {
       case TEST_RESTART_TS:
         ops->push_back({TEST_RESTART_TS, 0});
         break;
+      case TEST_SCAN_AT_TIMESTAMP: {
+        int timestamp = 1;
+        if (op_timestamps > 0) {
+          timestamp = (rand() % op_timestamps) + 1;
+        }
+        ops->push_back({TEST_SCAN_AT_TIMESTAMP, timestamp});
+        break;
+      }
       default:
         LOG(FATAL);
     }
@@ -409,35 +521,66 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
   // into a test method in order to reproduce a failure.
   LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);
 
+  // Keep the vector of timestamps we'll scan at so that we save the expected state at those
times.
+  vector<int> timestamps_to_scan;
+  for (const TestOp& test_op : test_ops) {
+    if (test_op.type == TEST_SCAN_AT_TIMESTAMP) {
+      timestamps_to_scan.push_back(test_op.val);
+    }
+  }
+  // Sort the scan timestamps in reverse order so that we can keep popping from the back
and remove
+  // duplicates.
+  sort(timestamps_to_scan.begin(), timestamps_to_scan.end(), std::greater<int>());
+  timestamps_to_scan.erase(unique(timestamps_to_scan.begin(),
+                                  timestamps_to_scan.end()),
+                           timestamps_to_scan.end() );
+
   vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
   vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
-    optional<ExpectedKeyValueRow> val_in_table = GetRow(test_op.row_key);
-    EXPECT_EQ(cur_val[test_op.row_key], val_in_table);
+    switch (test_op.type) {
+      case TEST_INSERT:
+      case TEST_UPSERT:
+      case TEST_UPSERT_PK_ONLY:
+      case TEST_UPDATE:
+      case TEST_DELETE:
+        EXPECT_EQ(cur_val[test_op.val], GetRow(test_op.val));
+        break;
+      default: break;
+    }
 
     LOG(INFO) << test_op.ToString();
     switch (test_op.type) {
       case TEST_INSERT:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY: {
-        pending_val[test_op.row_key] = InsertOrUpsertRow(
-            test_op.row_key, i++, pending_val[test_op.row_key], test_op.type);
+        pending_val[test_op.val] = InsertOrUpsertRow(
+            test_op.val, i++, pending_val[test_op.val], test_op.type);
         break;
       }
       case TEST_UPDATE:
         for (int j = 0; j < update_multiplier; j++) {
-          pending_val[test_op.row_key] = MutateRow(test_op.row_key, i++);
+          pending_val[test_op.val] = MutateRow(test_op.val, i++);
         }
         break;
       case TEST_DELETE:
-        pending_val[test_op.row_key] = DeleteRow(test_op.row_key);
+        pending_val[test_op.val] = DeleteRow(test_op.val);
         break;
-      case TEST_FLUSH_OPS:
+      case TEST_FLUSH_OPS: {
         FlushSessionOrDie(session_);
         cur_val = pending_val;
+        int current_time = down_cast<kudu::server::LogicalClock*>(
+            tablet()->clock().get())->GetCurrentTime();
+        // Check if the next snapshot scan has a time that is higher than the current time.
+        // If it is, then store the state so that we can match it later to the scanned state.
+        if (!timestamps_to_scan.empty() && current_time >= timestamps_to_scan.back())
{
+          saved_values_[current_time] = cur_val;
+          timestamps_to_scan.pop_back();
+        }
         break;
+      }
       case TEST_FLUSH_TABLET:
         ASSERT_OK(tablet()->Flush());
         break;
@@ -456,17 +599,19 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_RESTART_TS:
         NO_FATALS(RestartTabletServer());
         break;
+      case TEST_SCAN_AT_TIMESTAMP:
+        NO_FATALS(CheckScanAtTimestamp(test_op.val));
+        break;
       default:
         LOG(FATAL) << test_op.type;
     }
   }
 }
 
-
 // Generates a random test sequence and runs it.
 // The logs of this test are designed to easily be copy-pasted and create
 // more specific test cases like TestFuzz<N> below.
-TEST_F(FuzzTest, TestFuzz) {
+TEST_F(FuzzTest, TestRandomFuzz) {
   SeedRandom();
   vector<TestOp> test_ops;
   GenerateTestCase(&test_ops, AllowSlowTests() ? 1000 : 50);
@@ -476,7 +621,7 @@ TEST_F(FuzzTest, TestFuzz) {
 // Generates a random test case, but the UPDATEs are all repeated many times.
 // This results in very large batches which are likely to span multiple delta blocks
 // when flushed.
-TEST_F(FuzzTest, TestFuzzHugeBatches) {
+TEST_F(FuzzTest, TestRandomFuzzHugeBatches) {
   SeedRandom();
   vector<TestOp> test_ops;
   GenerateTestCase(&test_ops, AllowSlowTests() ? 500 : 50);
@@ -490,27 +635,26 @@ TEST_F(FuzzTest, TestFuzzHugeBatches) {
   RunFuzzCase(test_ops, update_multiplier);
 }
 
-// A particular test case which previously failed TestFuzz.
 TEST_F(FuzzTest, TestFuzz1) {
   vector<TestOp> test_ops = {
-    // Get an inserted row in a DRS.
-    {TEST_INSERT, 0},
-    {TEST_FLUSH_OPS, 0},
-    {TEST_FLUSH_TABLET, 0},
+      // Get an inserted row in a DRS.
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
 
-    // DELETE in DMS, INSERT in MRS and flush again.
-    {TEST_DELETE, 0},
-    {TEST_INSERT, 0},
-    {TEST_FLUSH_OPS, 0},
-    {TEST_FLUSH_TABLET, 0},
+      // DELETE in DMS, INSERT in MRS and flush again.
+      {TEST_DELETE, 0},
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
 
-    // State:
-    // RowSet RowSet(0):
-    //   (int32 key=1, int32 val=NULL) Undos: [@1(DELETE)] Redos (in DMS): [@2 DELETE]
-    // RowSet RowSet(1):
-    //   (int32 key=1, int32 val=NULL) Undos: [@2(DELETE)] Redos: []
+      // State:
+      // RowSet RowSet(0):
+      //   (int32 key=1, int32 val=NULL) Undos: [@1(DELETE)] Redos (in DMS): [@2 DELETE]
+      // RowSet RowSet(1):
+      //   (int32 key=1, int32 val=NULL) Undos: [@2(DELETE)] Redos: []
 
-    {TEST_COMPACT_TABLET, 0},
+      {TEST_COMPACT_TABLET, 0},
   };
   RunFuzzCase(test_ops);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/10f2ce20/src/kudu/tablet/key_value_test_schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/key_value_test_schema.h b/src/kudu/tablet/key_value_test_schema.h
index e81a4f2..479be39 100644
--- a/src/kudu/tablet/key_value_test_schema.h
+++ b/src/kudu/tablet/key_value_test_schema.h
@@ -21,10 +21,11 @@
 #pragma once
 
 #include <boost/optional.hpp>
-#include <boost/optional/optional_io.hpp>
 #include <iostream>
+#include <string>
 
 #include "kudu/common/schema.h"
+#include "kudu/gutil/strings/substitute.h"
 
 namespace kudu {
 
@@ -40,6 +41,16 @@ struct ExpectedKeyValueRow {
   bool operator==(const ExpectedKeyValueRow& other) const {
     return key == other.key && val == other.val;
   }
+
+  string ToString() const {
+    string ret = strings::Substitute("{$0,", key);
+    if (val == boost::none) {
+      ret.append("NULL}");
+    } else {
+      ret.append(strings::Substitute("$0}", *val));
+    }
+    return ret;
+  }
 };
 
 inline Schema CreateKeyValueTestSchema() {
@@ -48,13 +59,7 @@ inline Schema CreateKeyValueTestSchema() {
 }
 
 inline std::ostream& operator<<(std::ostream& o, const ExpectedKeyValueRow&
t) {
-  o << "{" << t.key << ", ";
-  if (t.val == boost::none) {
-    o << "NULL";
-  } else {
-    o << *t.val;
-  }
-  return o << "}";
+  return o << t.ToString();
 }
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/10f2ce20/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index e0b334f..5177a5a 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -75,6 +75,10 @@ DEFINE_int32(scanner_batch_size_rows, 100,
 TAG_FLAG(scanner_batch_size_rows, advanced);
 TAG_FLAG(scanner_batch_size_rows, runtime);
 
+DEFINE_bool(scanner_allow_snapshot_scans_with_logical_timestamps, false,
+            "If set, the server will support snapshot scans with logical timestamps.");
+TAG_FLAG(scanner_allow_snapshot_scans_with_logical_timestamps, unsafe);
+
 // Fault injection flags.
 DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
              "If set, the scanner will pause the specified number of milliesconds "
@@ -1747,13 +1751,18 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB&
scan_pb,
   // ... else we use the client provided one, but make sure it is not too far
   // in the future as to be invalid.
   } else {
-    tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp());
+
     Timestamp max_allowed_ts;
     Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts);
-    if (!s.ok()) {
+    if (s.IsNotSupported() &&
+        PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) {
       return Status::NotSupported("Snapshot scans not supported on this server",
                                   s.ToString());
     }
+    tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp());
+
+    // Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest() it's guaranteed
+    // to be higher than 'tmp_snap_timestamp'.
     if (tmp_snap_timestamp > max_allowed_ts) {
       return Status::InvalidArgument(
           Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1",


Mime
View raw message