kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject kudu git commit: KUDU-1933. consensus: Avoid and repair integer overflow in log index
Date Thu, 06 Apr 2017 22:39:28 GMT
Repository: kudu
Updated Branches:
  refs/heads/branch-1.3.x 9eebcdc79 -> 086d82fe3


KUDU-1933. consensus: Avoid and repair integer overflow in log index

We observed a crash on a long-running master server that looked like the
following:

  F0308 00:25:53.568773  7655 log_index.cc:171] Check failed: log_index > 0 (-2147483648
vs. 0)

It turns out that this was caused due to integer overflow on the log
index field. This patch fixes this type of truncation in a couple of
places (LogReader and MakeOpId()) and adds a couple of new tests that
fail without both of those fixes.

This patch also adds "repair" logic in log replay during tablet
bootstrap that "reverts" integer overflow when it is detected while
rewriting the log entry.

Finally, this patch includes some test helper fixes to avoid log index
integer truncation in future tests.

Change-Id: I284edbde51dc50fb2f98acc83cdcc3891d37863f
Reviewed-on: http://gerrit.cloudera.org:8080/6376
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>
(cherry picked from commit 8363b74506f8513e2fa9dbf772e30d0abce4e444)
Reviewed-on: http://gerrit.cloudera.org:8080/6544
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>
Tested-by: Jean-Daniel Cryans <jdcryans@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/086d82fe
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/086d82fe
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/086d82fe

Branch: refs/heads/branch-1.3.x
Commit: 086d82fe3f1636d7a9d0b4d763c178e4641c5768
Parents: 9eebcdc
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Mar 13 16:45:58 2017 -0700
Committer: Jean-Daniel Cryans <jdcryans@apache.org>
Committed: Thu Apr 6 22:39:11 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  18 +--
 src/kudu/consensus/log-test-base.h              |   2 +-
 src/kudu/consensus/log-test.cc                  |  20 +++
 src/kudu/consensus/log_cache-test.cc            |  14 +-
 src/kudu/consensus/log_reader.cc                |   6 +-
 src/kudu/consensus/log_reader.h                 |   2 +-
 src/kudu/consensus/opid_util.cc                 |   4 +-
 src/kudu/consensus/opid_util.h                  |   4 +-
 src/kudu/integration-tests/ts_recovery-itest.cc | 143 ++++++++++++++++++-
 src/kudu/tablet/tablet_bootstrap-test.cc        |  23 +++
 src/kudu/tablet/tablet_bootstrap.cc             |  29 ++++
 11 files changed, 237 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 2e80ae2..72cdf93 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -57,10 +57,10 @@ namespace consensus {
 using log::Log;
 using strings::Substitute;
 
-inline gscoped_ptr<ReplicateMsg> CreateDummyReplicate(int term,
-                                                      int index,
+inline gscoped_ptr<ReplicateMsg> CreateDummyReplicate(int64_t term,
+                                                      int64_t index,
                                                       const Timestamp& timestamp,
-                                                      int payload_size) {
+                                                      int64_t payload_size) {
     gscoped_ptr<ReplicateMsg> msg(new ReplicateMsg);
     OpId* id = msg->mutable_id();
     id->set_term(term);
@@ -89,13 +89,13 @@ inline RaftPeerPB FakeRaftPeerPB(const std::string& uuid) {
 inline void AppendReplicateMessagesToQueue(
     PeerMessageQueue* queue,
     const scoped_refptr<server::Clock>& clock,
-    int first,
-    int count,
-    int payload_size = 0) {
+    int64_t first,
+    int64_t count,
+    int64_t payload_size = 0) {
 
-  for (int i = first; i < first + count; i++) {
-    int term = i / 7;
-    int index = i;
+  for (int64_t i = first; i < first + count; i++) {
+    int64_t term = i / 7;
+    int64_t index = i;
     CHECK_OK(queue->AppendOperation(make_scoped_refptr_replicate(
         CreateDummyReplicate(term, index, clock->Now(), payload_size).release())));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index 5ab139a..e21623e 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -382,7 +382,7 @@ class LogTestBase : public KuduTest {
   gscoped_ptr<MetricRegistry> metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   scoped_refptr<Log> log_;
-  int32_t current_index_;
+  int64_t current_index_;
   LogOptions options_;
   // Reusable entries vector that deletes the entries on destruction.
   vector<LogEntryPB* > entries_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index fc55b1d..9098479 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -999,6 +999,26 @@ TEST_P(LogTestOptionalCompression, TestReadLogWithReplacedReplicates)
{
   }
 }
 
+// Ensure that we can read replicate messages from the LogReader with a very
+// high (> 32 bit) log index and term. Regression test for KUDU-1933.
+TEST_P(LogTestOptionalCompression, TestReadReplicatesHighIndex) {
+  const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3;
+  const int kSequenceLength = 10;
+
+  ASSERT_OK(BuildLog());
+  OpId op_id;
+  op_id.set_term(first_log_index);
+  op_id.set_index(first_log_index);
+  ASSERT_OK(AppendNoOps(&op_id, kSequenceLength));
+
+  shared_ptr<LogReader> reader = log_->reader();
+  vector<ReplicateMsg*> replicates;
+  ElementDeleter deleter(&replicates);
+  ASSERT_OK(reader->ReadReplicatesInRange(first_log_index, first_log_index + kSequenceLength
- 1,
+                                          LogReader::kNoSizeLimit, &replicates));
+  ASSERT_EQ(kSequenceLength, replicates.size());
+}
+
 // Test various situations where we expect different segments depending on what the
 // min log index is.
 TEST_F(LogTest, TestGetGCableDataSize) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/log_cache-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache-test.cc b/src/kudu/consensus/log_cache-test.cc
index cb26b50..bb61454 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -86,14 +86,12 @@ class LogCacheTest : public KuduTest {
     CHECK_OK(s);
   }
 
-  Status AppendReplicateMessagesToCache(
-    int first,
-    int count,
-    int payload_size = 0) {
-
-    for (int i = first; i < first + count; i++) {
-      int term = i / 7;
-      int index = i;
+  Status AppendReplicateMessagesToCache(int64_t first, int64_t count,
+                                        size_t payload_size = 0) {
+
+    for (int64_t cur_index = first; cur_index < first + count; cur_index++) {
+      int64_t term = cur_index / 7;
+      int64_t index = cur_index;
       vector<ReplicateRefPtr> msgs;
       msgs.push_back(make_scoped_refptr_replicate(
                        CreateDummyReplicate(term, index, clock_->Now(), payload_size).release()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/log_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 2c335f3..a9fe6de 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -63,7 +63,7 @@ using consensus::ReplicateMsg;
 using std::shared_ptr;
 using strings::Substitute;
 
-const int LogReader::kNoSizeLimit = -1;
+const int64_t LogReader::kNoSizeLimit = -1;
 
 Status LogReader::Open(FsManager* fs_manager,
                        const scoped_refptr<LogIndex>& index,
@@ -238,7 +238,7 @@ scoped_refptr<ReadableLogSegment> LogReader::GetSegmentBySequenceNumber(int64_t
 Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                            faststring* tmp_buf,
                                            gscoped_ptr<LogEntryBatchPB>* batch) const
{
-  const int index = index_entry.op_id.index();
+  const int64_t index = index_entry.op_id.index();
 
   scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
     index_entry.segment_sequence_number);
@@ -282,7 +282,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at,
   bool limit_exceeded = false;
   faststring tmp_buf;
   gscoped_ptr<LogEntryBatchPB> batch;
-  for (int index = starting_at; index <= up_to && !limit_exceeded; index++) {
+  for (int64_t index = starting_at; index <= up_to && !limit_exceeded; index++)
{
     LogIndexEntry index_entry;
     RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
                           Substitute("Failed to read log index for op $0", index));

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/log_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index 13ae6f5..ecd9cd3 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -89,7 +89,7 @@ class LogReader {
       int64_t up_to,
       int64_t max_bytes_to_read,
       std::vector<consensus::ReplicateMsg*>* replicates) const;
-  static const int kNoSizeLimit;
+  static const int64_t kNoSizeLimit;
 
   // Look up the OpId for the given operation index.
   // Returns a bad Status if the log index fails to load (eg. due to an IO error).

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/opid_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/opid_util.cc b/src/kudu/consensus/opid_util.cc
index 7b43069..c39cb51 100644
--- a/src/kudu/consensus/opid_util.cc
+++ b/src/kudu/consensus/opid_util.cc
@@ -152,7 +152,9 @@ std::string OpsRangeString(const ConsensusRequestPB& req) {
   return ret;
 }
 
-OpId MakeOpId(int term, int index) {
+OpId MakeOpId(int64_t term, int64_t index) {
+  CHECK_GE(term, 0);
+  CHECK_GE(index, 0);
   OpId ret;
   ret.set_index(index);
   ret.set_term(term);

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/consensus/opid_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/opid_util.h b/src/kudu/consensus/opid_util.h
index 76591fc..7c1a40e 100644
--- a/src/kudu/consensus/opid_util.h
+++ b/src/kudu/consensus/opid_util.h
@@ -89,7 +89,7 @@ struct OpIdCompareFunctor {
 };
 
 // OpId comparison functor that returns true iff left > right. Suitable for use
-// td::sort and std::map to sort keys in increasing order.]
+// with std::sort and std::map to sort keys in increasing order.
 struct OpIdBiggerThanFunctor {
   bool operator() (const OpId& left, const OpId& right) const;
 };
@@ -106,7 +106,7 @@ std::string OpIdToString(const OpId& id);
 
 std::string OpsRangeString(const ConsensusRequestPB& req);
 
-OpId MakeOpId(int term, int index);
+OpId MakeOpId(int64_t term, int64_t index);
 
 }  // namespace consensus
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index 6ac9755..2110665 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -15,18 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
+#include <string>
+
 #include "kudu/client/client.h"
+#include "kudu/consensus/log-test-base.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus-test-util.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/external_mini_cluster.h"
 #include "kudu/integration-tests/test_workload.h"
+#include "kudu/server/clock.h"
+#include "kudu/server/hybrid_clock.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/test_util.h"
 
-#include <string>
-
 using std::string;
+using std::unique_ptr;
 
 namespace kudu {
 
@@ -37,6 +47,14 @@ using client::KuduSession;
 using client::KuduTable;
 using client::KuduUpdate;
 using client::sp::shared_ptr;
+using consensus::ConsensusMetadata;
+using consensus::OpId;
+using consensus::RECEIVED_OPID;
+using log::AppendNoOpsToLogSync;
+using log::Log;
+using log::LogOptions;
+using server::Clock;
+using server::HybridClock;
 
 namespace {
 // Generate a row key such that an increasing sequence (0...N) ends up spreading writes
@@ -55,7 +73,7 @@ class TsRecoveryITest : public KuduTest {
   }
 
  protected:
-  void StartCluster(const vector<string>& extra_tserver_flags = vector<string>(),
+  void StartCluster(const vector<string>& extra_tserver_flags = {},
                     int num_tablet_servers = 1);
 
   gscoped_ptr<ExternalMiniCluster> cluster_;
@@ -281,6 +299,125 @@ TEST_F(TsRecoveryITest, TestChangeMaxCellSize) {
                                        MonoDelta::FromSeconds(60)));
 }
 
+class TsRecoveryITestDeathTest : public TsRecoveryITest {};
+
+// Test that tablet bootstrap can automatically repair itself if it finds an
+// overflowed OpId index written to the log caused by KUDU-1933.
+// Also serves as a regression itest for KUDU-1933 by writing ops with a high
+// term and index.
+TEST_F(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
+#if defined(THREAD_SANITIZER)
+  // TSAN cannot handle spawning threads after fork().
+  return;
+#endif
+
+  // Create the initial tablet files on disk, then shut down the cluster so we
+  // can meddle with the WAL.
+  NO_FATALS(StartCluster());
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.Setup();
+
+  std::unordered_map<std::string, itest::TServerDetails*> ts_map;
+  ValueDeleter del(&ts_map);
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+                                         cluster_->messenger(),
+                                         &ts_map));
+  vector<tserver::ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  auto* ets = cluster_->tablet_server(0);
+  auto* ts = ts_map[ets->uuid()];
+  ASSERT_OK(ListTablets(ts, MonoDelta::FromSeconds(10), &tablets));
+  ASSERT_EQ(1, tablets.size());
+  const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+  cluster_->Shutdown();
+
+  const int64_t kOverflowedIndexValue = static_cast<int64_t>(INT32_MIN);
+  const int64_t kDesiredIndexValue = static_cast<int64_t>(INT32_MAX) + 1;
+  const int kNumOverflowedEntriesToWrite = 4;
+
+  {
+    // Append a no-op to the WAL with an overflowed term and index to simulate a
+    // crash after KUDU-1933.
+    gscoped_ptr<FsManager> fs_manager(new FsManager(env_, ets->data_dir()));
+    ASSERT_OK(fs_manager->Open());
+    scoped_refptr<Clock> clock(new HybridClock());
+    ASSERT_OK(clock->Init());
+
+    OpId opid;
+    opid.set_term(kOverflowedIndexValue);
+    opid.set_index(kOverflowedIndexValue);
+
+    ASSERT_DEATH({
+      scoped_refptr<Log> log;
+      ASSERT_OK(Log::Open(LogOptions(),
+                          fs_manager.get(),
+                          tablet_id,
+                          SchemaBuilder(GetSimpleTestSchema()).Build(),
+                          0, // schema_version
+                          nullptr,
+                          &log));
+
+      // Write a series of negative OpIds.
+      // This will cause a crash, but only after they have been written to disk.
+      ASSERT_OK(AppendNoOpsToLogSync(clock, log.get(), &opid, kNumOverflowedEntriesToWrite));
+    }, "Check failed: log_index > 0");
+
+    // We also need to update the ConsensusMetadata to match with the term we
+    // want to end up with.
+    unique_ptr<ConsensusMetadata> cmeta;
+    ConsensusMetadata::Load(fs_manager.get(), tablet_id, fs_manager->uuid(), &cmeta);
+    cmeta->set_current_term(kDesiredIndexValue);
+    ASSERT_OK(cmeta->Flush());
+  }
+
+  // Don't become leader because that will append another NO_OP to the log.
+  ets->mutable_flags()->push_back("--enable_leader_failure_detection=false");
+  ASSERT_OK(cluster_->Restart());
+
+  OpId last_written_opid;
+  AssertEventually([&] {
+    // Tablet bootstrap should have converted the negative OpIds to positive ones.
+    ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id, ts, RECEIVED_OPID, MonoDelta::FromSeconds(5),
+                                           &last_written_opid));
+    ASSERT_TRUE(last_written_opid.IsInitialized());
+    OpId expected_opid;
+    expected_opid.set_term(kDesiredIndexValue);
+    expected_opid.set_index(static_cast<int64_t>(INT32_MAX) + kNumOverflowedEntriesToWrite);
+    ASSERT_OPID_EQ(expected_opid, last_written_opid);
+  });
+  NO_FATALS();
+
+  // Now, write some records that will have a higher opid than INT32_MAX and
+  // ensure they get written. This checks for overflows in the write path.
+
+  // We have to first remove the flag disabling failure detection.
+  cluster_->Shutdown();
+  ets->mutable_flags()->pop_back();
+  ASSERT_OK(cluster_->Restart());
+
+  // Write a few records.
+  workload.Start();
+  while (workload.batches_completed() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  // Validate.
+  OpId prev_written_opid = last_written_opid;
+  AssertEventually([&] {
+    ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id, ts, RECEIVED_OPID, MonoDelta::FromSeconds(5),
+                                           &last_written_opid));
+    ASSERT_TRUE(last_written_opid.IsInitialized());
+    ASSERT_GT(last_written_opid.term(), INT32_MAX);
+    ASSERT_GT(last_written_opid.index(), INT32_MAX);
+    // Term will increase because of an election.
+    ASSERT_GT(last_written_opid.term(), prev_written_opid.term());
+    ASSERT_GT(last_written_opid.index(), prev_written_opid.index());
+  });
+  NO_FATALS();
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
 
 // A set of threads which pick rows which are known to exist in the table
 // and issue random updates against them.

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index fa2394a..0d9e508 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -171,6 +171,29 @@ TEST_F(BootstrapTest, TestBootstrap) {
   ASSERT_EQ(1, results.size());
 }
 
+// Test that we don't overflow opids. Regression test for KUDU-1933.
+TEST_F(BootstrapTest, TestBootstrapHighOpIdIndex) {
+  // Start appending with a log index 3 under the int32 max value.
+  // Append 6 log entries, which will roll us right through the int32 max.
+  const int64_t first_log_index = std::numeric_limits<int32_t>::max() - 3;
+  const int kNumEntries = 6;
+  ASSERT_OK(BuildLog());
+  current_index_ = first_log_index;
+  for (int i = 0; i < kNumEntries; i++) {
+    AppendReplicateBatchAndCommitEntryPairsToLog(1);
+  }
+
+  // Kick off tablet bootstrap and ensure everything worked.
+  shared_ptr<Tablet> tablet;
+  ConsensusBootstrapInfo boot_info;
+  ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info));
+  OpId last_opid;
+  last_opid.set_term(1);
+  last_opid.set_index(current_index_ - 1);
+  ASSERT_OPID_EQ(last_opid, boot_info.last_id);
+  ASSERT_OPID_EQ(last_opid, boot_info.last_committed_id);
+}
+
 // Tests attempting a local bootstrap of a tablet that was in the middle of a
 // tablet copy before "crashing".
 TEST_F(BootstrapTest, TestIncompleteTabletCopy) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/086d82fe/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 0749486..04eea5f 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -61,6 +61,8 @@
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 
+DECLARE_int32(group_commit_queue_size_bytes);
+
 DEFINE_bool(skip_remove_old_recovery_dir, false,
             "Skip removing WAL recovery dir after startup. (useful for debugging)");
 TAG_FLAG(skip_remove_old_recovery_dir, hidden);
@@ -811,10 +813,34 @@ Status TabletBootstrap::HandleEntry(ReplayState* state, LogEntryPB*
entry) {
   return Status::OK();
 }
 
+// Repair overflow issue reported in KUDU-1933.
+void CheckAndRepairOpIdOverflow(OpId* opid) {
+  if (PREDICT_FALSE(opid->term() < consensus::kMinimumTerm)) {
+    int64_t overflow = opid->term() - INT32_MIN + 1LL;
+    CHECK_GE(overflow, 1) << OpIdToString(*opid);
+    opid->set_term(static_cast<int64_t>(INT32_MAX) + overflow);
+  }
+  if (PREDICT_FALSE(opid->index() < consensus::kMinimumOpIdIndex &&
+                    opid->index() != consensus::kInvalidOpIdIndex)) {
+    int64_t overflow = opid->index() - INT32_MIN + 1LL;
+    CHECK_GE(overflow, 1) << OpIdToString(*opid);
+    // Sanity check. Even with the bug in KUDU-1933, the number of bytes
+    // allowed in a single group commit is a generous upper bound on how far a
+    // log index may have overflowed before causing a crash.
+    CHECK_LT(overflow, FLAGS_group_commit_queue_size_bytes) << OpIdToString(*opid);
+    opid->set_index(static_cast<int64_t>(INT32_MAX) + overflow);
+  }
+}
+
 // Takes ownership of 'replicate_entry' on OK status.
 Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry)
{
   stats_.ops_read++;
 
+  DCHECK(replicate_entry->has_replicate());
+
+  // Fix overflow if necessary (see KUDU-1933).
+  CheckAndRepairOpIdOverflow(replicate_entry->mutable_replicate()->mutable_id());
+
   const ReplicateMsg& replicate = replicate_entry->replicate();
   RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate));
   DCHECK(replicate.has_timestamp());
@@ -859,6 +885,9 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB*
comm
   DCHECK(commit_entry->has_commit()) << "Not a commit message: "
                                      << SecureDebugString(*commit_entry);
 
+  // Fix overflow if necessary (see KUDU-1933).
+  CheckAndRepairOpIdOverflow(commit_entry->mutable_commit()->mutable_commited_op_id());
+
   // Match up the COMMIT record with the original entry that it's applied to.
   const OpId& committed_op_id = commit_entry->commit().commited_op_id();
   state->UpdateCommittedOpId(committed_op_id);


Mime
View raw message