kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/8] kudu git commit: consensus: refactor tracking of received OpIds out of ReplicaState
Date Thu, 06 Oct 2016 00:05:57 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 64f9ab34f -> bce1dd777


consensus: refactor tracking of received OpIds out of ReplicaState

The PeerMessageQueue class was already tracking the last appended OpId, so
tracking it in ReplicaState was redundant and confusing. This removes a
bunch of stuff from ReplicaState and adds just a little bit of new
functionality to PeerMessageQueue:

- TruncateOpsAfter() now takes an index instead of an OpId. The queue
  can already map the index to an OpId by asking the log.
- Added a getter to expose the last OpId in the log back to RaftConsensus
- Changed OpId generation to happen in PeerMessageQueue. This was easy
  because it already knows the previous OpId and the current term.

The 'last_received_cur_leader' tracking was moved into RaftConsensus
itself, since it's just transient state tracking the RPC back-and-forths
between a leader and the follower.

This patch also removes raft_consensus-test, the mock-based testing for
RaftConsensus. I found that maintaining this test was very difficult, in
particular because now we rely on the fact that AppendOperations() is
reflected in GetLastOpIdInLog(). With a mock PeerMessageQueue, this
state update wasn't happening properly, and trying to reproduce that
behavior in the mocks themselves seemed like I was basically
re-implementing the actual production code for the queue. I looked over
the tests in this suite and I believe all of the cases are covered by
various other tests (randomized and otherwise).

I looped raft_consensus-itest 100 times[1], the Churny test case 1000
times[2], and exactly_once_writes-itest 1000 times[3]. Lastly, I was
able to re-enable TestChurnyElections_WithNotificationLatency and loop
it 500 times[4].

[1] http://dist-test.cloudera.org/job?job_id=todd.1474357631.30024
[2] http://dist-test.cloudera.org//job?job_id=todd.1474359004.2328
[3] http://dist-test.cloudera.org//job?job_id=todd.1474358436.31536
[4] http://dist-test.cloudera.org//job?job_id=todd.1474359250.4834

Change-Id: I81614d26328b0fbba37bf279f59717e05a07b816
Reviewed-on: http://gerrit.cloudera.org:8080/4476
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4bcbb4a4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4bcbb4a4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4bcbb4a4

Branch: refs/heads/master
Commit: 4bcbb4a405c037162954cba218c2316e517cdbf3
Parents: 64f9ab3
Author: Todd Lipcon <todd@apache.org>
Authored: Tue Sep 20 00:33:45 2016 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Oct 5 21:41:26 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |   1 -
 src/kudu/consensus/consensus_queue.cc           |  20 +-
 src/kudu/consensus/consensus_queue.h            |  14 +-
 src/kudu/consensus/raft_consensus-test.cc       | 663 -------------------
 src/kudu/consensus/raft_consensus.cc            |  48 +-
 src/kudu/consensus/raft_consensus.h             |   7 +-
 .../consensus/raft_consensus_quorum-test.cc     |   9 +-
 src/kudu/consensus/raft_consensus_state.cc      |  58 +-
 src/kudu/consensus/raft_consensus_state.h       |  35 -
 .../integration-tests/raft_consensus-itest.cc   |   7 +-
 10 files changed, 61 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index 88f0c61..da4cc65 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -136,7 +136,6 @@ ADD_KUDU_TEST(mt-log-test)
 ADD_KUDU_TEST(quorum_util-test)
 ADD_KUDU_TEST(raft_consensus_quorum-test)
 ADD_KUDU_TEST(raft_consensus_state-test)
-ADD_KUDU_TEST(raft_consensus-test)
 
 # Our current version of gmock overrides virtual functions without adding
 # the 'override' keyword which, since our move to c++11, make the compiler

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index b153bdd..6598662 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -291,9 +291,13 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
   return Status::OK();
 }
 
-void PeerMessageQueue::TruncateOpsAfter(const OpId& op) {
+void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
   DFAKE_SCOPED_LOCK(append_fake_lock_); // should not race with append.
-
+  OpId op;
+  CHECK_OK_PREPEND(log_cache_.LookupOpId(index, &op),
+                   Substitute("$0: cannot truncate ops after bad index $1",
+                              LogPrefixUnlocked(),
+                              index));
   {
     std::unique_lock<simple_spinlock> lock(queue_lock_);
     queue_state_.last_appended = op;
@@ -301,6 +305,18 @@ void PeerMessageQueue::TruncateOpsAfter(const OpId& op) {
   log_cache_.TruncateOpsAfter(op.index());
 }
 
+OpId PeerMessageQueue::GetLastOpIdInLog() const {
+  std::unique_lock<simple_spinlock> lock(queue_lock_);
+  return queue_state_.last_appended;
+}
+
+OpId PeerMessageQueue::GetNextOpId() const {
+  std::unique_lock<simple_spinlock> lock(queue_lock_);
+  return MakeOpId(queue_state_.current_term,
+                  queue_state_.last_appended.index() + 1);
+}
+
+
 Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
                                         vector<ReplicateRefPtr>* msg_refs,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index abcc089..c5fc98c 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -181,9 +181,17 @@ class PeerMessageQueue {
   virtual Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
                                   const StatusCallback& log_append_callback);
 
-  // Truncate all operations coming after 'op'. Following this, the 'last_appended'
-  // operation is reset to 'op', and the log cache will be truncated accordingly.
-  virtual void TruncateOpsAfter(const OpId& op);
+  // Truncate all operations coming after 'index'. Following this, the 'last_appended'
+  // operation is reset to the OpId with this index, and the log cache will be truncated
+  // accordingly.
+  virtual void TruncateOpsAfter(int64_t index);
+
+  // Return the last OpId in the log.
+  // Note that this can move backwards after a truncation (TruncateOpsAfter).
+  virtual OpId GetLastOpIdInLog() const;
+
+  // Return the next OpId to be appended to the queue in the current term.
+  virtual OpId GetNextOpId() const;
 
   // Assembles a request for a peer, adding entries past 'op_id' up to
   // 'consensus_max_batch_size_bytes'.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
deleted file mode 100644
index 147d626..0000000
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ /dev/null
@@ -1,663 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-#include <memory>
-
-#include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol-test-util.h"
-#include "kudu/consensus/consensus_peers.h"
-#include "kudu/consensus/consensus-test-util.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/peer_manager.h"
-#include "kudu/fs/fs_manager.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/server/logical_clock.h"
-#include "kudu/util/async_util.h"
-#include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-DECLARE_bool(enable_leader_failure_detection);
-
-METRIC_DECLARE_entity(tablet);
-
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-
-namespace kudu {
-namespace consensus {
-
-using log::Log;
-using log::LogOptions;
-using ::testing::_;
-using ::testing::AnyNumber;
-using ::testing::AtLeast;
-using ::testing::InSequence;
-using ::testing::Invoke;
-using ::testing::Mock;
-using ::testing::Return;
-
-const char* kTestTablet = "TestTablet";
-const char* kLocalPeerUuid = "peer-0";
-
-// A simple map to collect the results of a sequence of transactions.
-typedef std::map<OpId, Status, OpIdCompareFunctor> StatusesMap;
-
-class MockQueue : public PeerMessageQueue {
- public:
-  explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log)
-    : PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {}
-  MOCK_METHOD1(Init, void(const OpId& locally_replicated_index));
-  MOCK_METHOD3(SetLeaderMode, void(int64_t committed_opid,
-                                   int64_t current_term,
-                                   const RaftConfigPB& active_config));
-  MOCK_METHOD0(SetNonLeaderMode, void());
-  virtual Status AppendOperations(const vector<ReplicateRefPtr>& msgs,
-                                  const StatusCallback& callback) OVERRIDE {
-    return AppendOperationsMock(msgs, callback);
-  }
-  MOCK_METHOD2(AppendOperationsMock, Status(const vector<ReplicateRefPtr>& msgs,
-                                            const StatusCallback& callback));
-  MOCK_METHOD1(TruncateOpsAfter, void(const OpId& op_id));
-  MOCK_METHOD1(TrackPeer, void(const string&));
-  MOCK_METHOD1(UntrackPeer, void(const string&));
-  MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid,
-                                      ConsensusRequestPB* request,
-                                      std::vector<ReplicateRefPtr>* msg_refs,
-                                      bool* needs_tablet_copy));
-  MOCK_METHOD3(ResponseFromPeer, void(const std::string& peer_uuid,
-                                      const ConsensusResponsePB& response,
-                                      bool* more_pending));
-  MOCK_METHOD0(Close, void());
-};
-
-class MockPeerManager : public PeerManager {
- public:
-  MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {}
-  MOCK_METHOD1(UpdateRaftConfig, Status(const consensus::RaftConfigPB& config));
-  MOCK_METHOD1(SignalRequest, void(bool force_if_queue_empty));
-  MOCK_METHOD0(Close, void());
-};
-
-class RaftConsensusSpy : public RaftConsensus {
- public:
-  typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback;
-
-  RaftConsensusSpy(const ConsensusOptions& options,
-                   unique_ptr<ConsensusMetadata> cmeta,
-                   gscoped_ptr<PeerProxyFactory> proxy_factory,
-                   gscoped_ptr<PeerMessageQueue> queue,
-                   gscoped_ptr<PeerManager> peer_manager,
-                   gscoped_ptr<ThreadPool> thread_pool,
-                   const scoped_refptr<MetricEntity>& metric_entity,
-                   const std::string& peer_uuid,
-                   const scoped_refptr<server::Clock>& clock,
-                   ReplicaTransactionFactory* txn_factory,
-                   const scoped_refptr<log::Log>& log,
-                   const shared_ptr<MemTracker>& parent_mem_tracker,
-                   const Callback<void(const std::string& reason)>& mark_dirty_clbk)
-    : RaftConsensus(options,
-                    std::move(cmeta),
-                    std::move(proxy_factory),
-                    std::move(queue),
-                    std::move(peer_manager),
-                    std::move(thread_pool),
-                    metric_entity,
-                    peer_uuid,
-                    clock,
-                    txn_factory,
-                    log,
-                    parent_mem_tracker,
-                    mark_dirty_clbk) {
-    // These "aliases" allow us to count invocations and assert on them.
-    ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_))
-        .WillByDefault(Invoke(this,
-              &RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete));
-    ON_CALL(*this, NonTxRoundReplicationFinished(_, _, _))
-        .WillByDefault(Invoke(this, &RaftConsensusSpy::NonTxRoundReplicationFinishedConcrete));
-  }
-
-  MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round));
-  Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) {
-    return RaftConsensus::AppendNewRoundToQueueUnlocked(round);
-  }
-
-  MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateRefPtr& msg));
-  Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateRefPtr& msg) {
-    return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg);
-  }
-
-  MOCK_METHOD3(NonTxRoundReplicationFinished, void(ConsensusRound* round,
-                                                   const StatusCallback& client_cb,
-                                                   const Status& status));
-  void NonTxRoundReplicationFinishedConcrete(ConsensusRound* round,
-                                             const StatusCallback& client_cb,
-                                             const Status& status) {
-    LOG(INFO) << "Committing round with opid " << round->id()
-              << " given Status " << status.ToString();
-    RaftConsensus::NonTxRoundReplicationFinished(round, client_cb, status);
-  }
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy);
-};
-
-void DoNothing(const string& s) {
-}
-
-class RaftConsensusTest : public KuduTest {
- public:
-  RaftConsensusTest()
-      : clock_(server::LogicalClock::CreateStartingAt(Timestamp(0))),
-        metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test")),
-        schema_(GetSimpleTestSchema()) {
-    FLAGS_enable_leader_failure_detection = false;
-    options_.tablet_id = kTestTablet;
-  }
-
-  virtual void SetUp() OVERRIDE {
-    LogOptions options;
-    string test_path = GetTestPath("test-peer-root");
-
-    // TODO mock the Log too, since we're gonna mock the queue
-    // monitors and pretty much everything else.
-    fs_manager_.reset(new FsManager(env_.get(), test_path));
-    CHECK_OK(fs_manager_->CreateInitialFileSystemLayout());
-    CHECK_OK(fs_manager_->Open());
-    CHECK_OK(Log::Open(LogOptions(),
-                       fs_manager_.get(),
-                       kTestTablet,
-                       schema_,
-                       0, // schema_version
-                       nullptr,
-                       &log_));
-
-    queue_ = new MockQueue(metric_entity_, log_.get());
-    peer_manager_ = new MockPeerManager;
-    txn_factory_.reset(new MockTransactionFactory);
-
-    ON_CALL(*queue_, AppendOperationsMock(_, _))
-        .WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog));
-  }
-
-  void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) {
-    config_ = BuildRaftConfigPBForTests(num_peers);
-    config_.set_opid_index(kInvalidOpIdIndex);
-
-    gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(nullptr));
-
-    string peer_uuid = config_.peers(num_peers - 1).permanent_uuid();
-
-    unique_ptr<ConsensusMetadata> cmeta;
-    CHECK_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid,
-                                       config_, initial_term, &cmeta));
-
-    gscoped_ptr<ThreadPool> thread_pool;
-    CHECK_OK(ThreadPoolBuilder("raft-pool") .Build(&thread_pool));
-
-    consensus_.reset(new RaftConsensusSpy(options_,
-                                          std::move(cmeta),
-                                          std::move(proxy_factory),
-                                          gscoped_ptr<PeerMessageQueue>(queue_),
-                                          gscoped_ptr<PeerManager>(peer_manager_),
-                                          std::move(thread_pool),
-                                          metric_entity_,
-                                          peer_uuid,
-                                          clock_,
-                                          txn_factory_.get(),
-                                          log_.get(),
-                                          MemTracker::GetRootTracker(),
-                                          Bind(&DoNothing)));
-
-    ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-        .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound));
-  }
-
-  Status AppendToLog(const vector<ReplicateRefPtr>& msgs,
-                     const StatusCallback& callback) {
-    return log_->AsyncAppendReplicates(msgs,
-                                       Bind(LogAppendCallback, callback));
-  }
-
-  static void LogAppendCallback(const StatusCallback& callback,
-                                const Status& s) {
-    CHECK_OK(s);
-    callback.Run(s);
-  }
-
-  Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) {
-    rounds_.push_back(round);
-    RETURN_NOT_OK(consensus_->AppendNewRoundToQueueUnlockedConcrete(round));
-    LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: "
-              << round->replicate_msg()->ShortDebugString();
-    return Status::OK();
-  }
-
-  void SetUpGeneralExpectations() {
-    EXPECT_CALL(*peer_manager_, SignalRequest(_))
-        .Times(AnyNumber());
-    EXPECT_CALL(*peer_manager_, Close())
-        .Times(AtLeast(1));
-    EXPECT_CALL(*queue_, Close())
-        .Times(1);
-    EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-        .Times(AnyNumber());
-  }
-
-  // Create a ConsensusRequestPB suitable to send to a peer.
-  ConsensusRequestPB MakeConsensusRequest(int64_t caller_term,
-                                          const string& caller_uuid,
-                                          const OpId& preceding_opid);
-
-  // Add a single no-op with the given OpId to a ConsensusRequestPB.
-  void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpId& noop_opid);
-
-  scoped_refptr<ConsensusRound> AppendNoOpRound() {
-    ReplicateRefPtr replicate_ptr(make_scoped_refptr_replicate(new ReplicateMsg));
-    replicate_ptr->get()->set_op_type(NO_OP);
-    replicate_ptr->get()->set_timestamp(clock_->Now().ToUint64());
-    scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(), replicate_ptr));
-    round->SetConsensusReplicatedCallback(
-        Bind(&RaftConsensusSpy::NonTxRoundReplicationFinished,
-             Unretained(consensus_.get()), Unretained(round.get()), Bind(&DoNothingStatusCB)));
-
-    CHECK_OK(consensus_->Replicate(round));
-    LOG(INFO) << "Appended NO_OP round with opid " << round->id();
-    return round;
-  }
-
-  void DumpRounds() {
-    LOG(INFO) << "Dumping rounds...";
-    for (const scoped_refptr<ConsensusRound>& round : rounds_) {
-      LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: "
-                << round->replicate_msg()->ShortDebugString();
-    }
-  }
-
- protected:
-  ConsensusOptions options_;
-  RaftConfigPB config_;
-  OpId initial_id_;
-  gscoped_ptr<FsManager> fs_manager_;
-  scoped_refptr<Log> log_;
-  gscoped_ptr<PeerProxyFactory> proxy_factory_;
-  scoped_refptr<server::Clock> clock_;
-  MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
-  const Schema schema_;
-  scoped_refptr<RaftConsensusSpy> consensus_;
-
-  vector<scoped_refptr<ConsensusRound> > rounds_;
-
-  // Mocks.
-  // NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before
-  // the test is.
-  MockQueue* queue_;
-  MockPeerManager* peer_manager_;
-  gscoped_ptr<MockTransactionFactory> txn_factory_;
-};
-
-ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term,
-                                                           const string& caller_uuid,
-                                                           const OpId& preceding_opid) {
-  ConsensusRequestPB request;
-  request.set_caller_term(caller_term);
-  request.set_caller_uuid(caller_uuid);
-  request.set_tablet_id(kTestTablet);
-  request.set_all_replicated_index(0);
-  *request.mutable_preceding_id() = preceding_opid;
-  return request;
-}
-
-void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request,
-                                                  const OpId& noop_opid) {
-  ReplicateMsg* noop_msg = request->add_ops();
-  *noop_msg->mutable_id() = noop_opid;
-  noop_msg->set_op_type(NO_OP);
-  noop_msg->set_timestamp(clock_->Now().ToUint64());
-  noop_msg->mutable_noop_request();
-}
-
-// Asserts that a ConsensusRound has an OpId set in its ReplicateMsg.
-MATCHER(HasOpId, "") { return arg->id().IsInitialized(); }
-
-// These matchers assert that a Status object is of a certain type.
-MATCHER(IsOk, "") { return arg.ok(); }
-MATCHER(IsAborted, "") { return arg.IsAborted(); }
-
-// Tests that consensus is able to handle pending operations. It tests this in two ways:
-// - It tests that consensus does the right thing with pending transactions from the the WAL.
-// - It tests that when a follower gets promoted to leader it does the right thing
-//   with the pending operations.
-TEST_F(RaftConsensusTest, TestPendingTransactions) {
-  SetUpConsensus(10);
-
-  // Emulate a stateful system by having a bunch of operations in flight when consensus starts.
-  // Specifically we emulate we're on term 10, with 5 operations before the last known
-  // committed operation, 10.104, which should be committed immediately, and 5 operations after the
-  // last known committed operation, which should be pending but not yet committed.
-  ConsensusBootstrapInfo info;
-  info.last_id.set_term(10);
-  for (int i = 0; i < 10; i++) {
-    auto replicate = new ReplicateMsg();
-    replicate->set_op_type(NO_OP);
-    info.last_id.set_index(100L + i);
-    replicate->mutable_id()->CopyFrom(info.last_id);
-    info.orphaned_replicates.push_back(replicate);
-  }
-
-  info.last_committed_id.set_term(10);
-  info.last_committed_id.set_index(104);
-
-  {
-    InSequence dummy;
-    // On start we expect 10 NO_OPs to be enqueued, with 5 of those having
-    // their commit continuation called immediately.
-    EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
-        .Times(10);
-
-    // Queue gets initted when the peer starts.
-    EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  }
-
-  ASSERT_OK(consensus_->Start(info));
-
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get()));
-  // Now we test what this peer does with the pending operations once it's elected leader.
-  {
-    InSequence dummy;
-    // Peer manager gets updated with the new set of peers to send stuff to.
-    EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-        .Times(1).WillOnce(Return(Status::OK()));
-    // The no-op should be appended to the queue.
-    // One more op will be appended for the election.
-    EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-        .Times(1);
-    EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-        .Times(1).WillRepeatedly(Return(Status::OK()));;
-  }
-
-  // Emulate an election, this will make this peer become leader and trigger the
-  // above set expectations.
-  ASSERT_OK(consensus_->EmulateElection());
-
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get()));
-  ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_));
-
-  // Commit the 5 no-ops from the previous term, along with the one pushed to
-  // assert leadership.
-  EXPECT_CALL(*consensus_.get(), NonTxRoundReplicationFinished(HasOpId(), _, IsOk()))
-      .Times(6);
-  EXPECT_CALL(*peer_manager_, SignalRequest(_))
-      .Times(AnyNumber());
-  // In the end peer manager and the queue get closed.
-  EXPECT_CALL(*peer_manager_, Close())
-      .Times(AtLeast(1));
-  EXPECT_CALL(*queue_, Close())
-      .Times(1);
-
-  // Now mark the last operation (the no-op round) as committed.
-  // This should advance the committed index, since that round in on our current term,
-  // and we should be able to commit all previous rounds.
-  int64_t cc_round_index = info.orphaned_replicates.back()->id().index() + 1;
-  consensus_->NotifyCommitIndex(cc_round_index);
-}
-
-MATCHER_P2(RoundHasOpId, term, index, "") {
-  LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id();
-  return arg->id().term() == term && arg->id().index() == index;
-}
-
-MATCHER_P2(EqOpId, term, index, "") {
-  return arg.term() == term && arg.index() == index;
-}
-
-// Tests the case where a a leader is elected and pushed a sequence of
-// operations of which some never get committed. Eventually a new leader in a higher
-// term pushes operations that overwrite some of the original indexes.
-TEST_F(RaftConsensusTest, TestAbortOperations) {
-  SetUpConsensus(1, 2);
-
-  EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_))
-      .Times(AnyNumber());
-
-  EXPECT_CALL(*peer_manager_, SignalRequest(_))
-      .Times(AnyNumber());
-  EXPECT_CALL(*peer_manager_, Close())
-      .Times(AtLeast(1));
-  EXPECT_CALL(*queue_, Close())
-      .Times(1);
-  EXPECT_CALL(*queue_, Init(_))
-      .Times(1);
-  EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_))
-      .Times(1)
-      .WillRepeatedly(Return(Status::OK()));
-
-  // We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader
-  // and the new leader's update, when we're overwriting operations.
-  EXPECT_CALL(*queue_, AppendOperationsMock(_, _))
-      .Times(12);
-
-  // .. but those will be overwritten later by another
-  // leader, which will push and commit 5 ops.
-  // Only these five should start as replica rounds.
-  EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_))
-      .Times(4);
-
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-  ASSERT_OK(consensus_->EmulateElection());
-
-  // Append 10 rounds: 2.2 - 2.11
-  for (int i = 0; i < 10; i++) {
-    AppendNoOpRound();
-  }
-
-  // Expectations for what gets committed and what gets aborted:
-  // (note: the aborts may be triggered before the commits)
-  // 5 OK's for the 2.1-2.5 ops.
-  // 6 Aborts for the 2.6-2.11 ops.
-  // 1 OK for the 3.6 op.
-  for (int index = 1; index < 6; index++) {
-    EXPECT_CALL(*consensus_.get(),
-                NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1);
-  }
-  for (int index = 6; index < 12; index++) {
-    EXPECT_CALL(*consensus_.get(),
-                NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsAborted())).Times(1);
-  }
-  EXPECT_CALL(*consensus_.get(),
-              NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1);
-  EXPECT_CALL(*queue_, TruncateOpsAfter(EqOpId(2, 5))).Times(1);
-
-  // Nothing's committed so far, so now just send an Update() message
-  // emulating another guy got elected leader and is overwriting a suffix
-  // of the previous messages.
-  // In particular this request has:
-  // - Op 2.5 from the previous leader's term
-  // - Ops 3.6-3.9 from the new leader's term
-  // - A new committed index of 3.6
-  ConsensusRequestPB request;
-  request.set_caller_term(3);
-  const string PEER_0_UUID = "peer-0";
-  request.set_caller_uuid(PEER_0_UUID);
-  request.set_tablet_id(kTestTablet);
-  request.set_all_replicated_index(0);
-  request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
-
-  ReplicateMsg* replicate = request.add_ops();
-  replicate->mutable_id()->CopyFrom(MakeOpId(2, 5));
-  replicate->set_op_type(NO_OP);
-
-  ReplicateMsg* noop_msg = request.add_ops();
-  noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6));
-  noop_msg->set_op_type(NO_OP);
-  noop_msg->set_timestamp(clock_->Now().ToUint64());
-  noop_msg->mutable_noop_request();
-
-  // Overwrite another 3 of the original rounds for a total of 4 overwrites.
-  for (int i = 7; i < 10; i++) {
-    ReplicateMsg* replicate = request.add_ops();
-    replicate->mutable_id()->CopyFrom(MakeOpId(3, i));
-    replicate->set_op_type(NO_OP);
-    replicate->set_timestamp(clock_->Now().ToUint64());
-  }
-
-  request.set_committed_index(6);
-
-  ConsensusResponsePB response;
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.has_error());
-
-  ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get()));
-
-  // Now we expect to commit ops 3.7 - 3.9.
-  for (int index = 7; index < 10; index++) {
-    EXPECT_CALL(*consensus_.get(),
-                NonTxRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1);
-  }
-
-  request.mutable_ops()->Clear();
-  request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9));
-  request.set_committed_index(9);
-
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.has_error());
-}
-
-TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) {
-  SetUpConsensus();
-  OpId opid;
-  ASSERT_OK(consensus_->GetLastOpId(RECEIVED_OPID, &opid));
-  ASSERT_TRUE(opid.IsInitialized());
-  ASSERT_OPID_EQ(opid, MinimumOpId());
-}
-
-// Ensure that followers reset their "last_received_current_leader"
-// ConsensusStatusPB field when a new term is encountered. This is a
-// correctness test for the logic on the follower side that allows the
-// leader-side queue to determine which op to send next in various scenarios.
-TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) {
-  SetUpConsensus(kMinimumTerm, 3);
-  SetUpGeneralExpectations();
-  ConsensusBootstrapInfo info;
-  ASSERT_OK(consensus_->Start(info));
-
-  ConsensusRequestPB request;
-  ConsensusResponsePB response;
-  int64_t caller_term = 0;
-  int64_t log_index = 0;
-
-  caller_term = 1;
-  string caller_uuid = config_.peers(0).permanent_uuid();
-  OpId preceding_opid = MinimumOpId();
-
-  // Heartbeat. This will cause the term to increment on the follower.
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId());
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
-
-  // Replicate a no-op.
-  OpId noop_opid = MakeOpId(caller_term, ++log_index);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(),  noop_opid);
-
-  // New leader heartbeat. Term increase to 2.
-  // The preceding_opid is the no-op replicated above. This will match on the
-  // follower side, so it can update its last_received_current_leader to
-  // the same operation (indicating to the queue that it doesn't need to re-replicate
-  // this operation).
-  caller_term = 2;
-  caller_uuid = config_.peers(1).permanent_uuid();
-  preceding_opid = noop_opid;
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), preceding_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), preceding_opid);
-
-  // Append a no-op.
-  noop_opid = MakeOpId(caller_term, ++log_index);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
-
-  // New leader heartbeat. The term should rev but we should get an LMP mismatch.
-  caller_term = 3;
-  caller_uuid = config_.peers(0).permanent_uuid();
-  preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet.
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time.
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
-  ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code());
-
-  // Decrement preceding and append a no-op.
-  preceding_opid = MakeOpId(2, log_index);
-  noop_opid = MakeOpId(caller_term, ++log_index);
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString();
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid)
-      << response.ShortDebugString();
-
-  // Happy case. New leader with new no-op to append right off the bat.
-  // Response should be OK with all last_received* fields equal to the new no-op.
-  caller_term = 4;
-  caller_uuid = config_.peers(1).permanent_uuid();
-  preceding_opid = noop_opid;
-  noop_opid = MakeOpId(caller_term, ++log_index);
-  request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid);
-  AddNoOpToConsensusRequest(&request, noop_opid);
-  response.Clear();
-  ASSERT_OK(consensus_->Update(&request, &response));
-  ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
-  ASSERT_EQ(caller_term, response.responder_term());
-  ASSERT_OPID_EQ(response.status().last_received(), noop_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid);
-}
-
-}  // namespace consensus
-}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index aaaf228..0cb724e 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -231,6 +231,7 @@ RaftConsensus::RaftConsensus(
           FLAGS_raft_heartbeat_interval_ms *
           FLAGS_leader_failure_max_missed_heartbeat_periods))),
       withhold_votes_until_(MonoTime::Min()),
+      last_received_cur_leader_(MinimumOpId()),
       mark_dirty_clbk_(std::move(mark_dirty_clbk)),
       shutdown_(false),
       follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter(
@@ -283,7 +284,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
 
     state_->SetInitialCommittedOpIdUnlocked(info.last_committed_id);
 
-    queue_->Init(state_->GetLastReceivedOpIdUnlocked());
+    queue_->Init(info.last_id);
   }
 
   {
@@ -413,7 +414,7 @@ Status RaftConsensus::StartElection(ElectionMode mode) {
     request.set_candidate_term(state_->GetCurrentTermUnlocked());
     request.set_tablet_id(state_->GetOptions().tablet_id);
     *request.mutable_candidate_status()->mutable_last_received() =
-        state_->GetLastReceivedOpIdUnlocked();
+        queue_->GetLastOpIdInLog();
 
     election.reset(new LeaderElection(active_config,
                                       peer_proxy_factory_.get(),
@@ -548,7 +549,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
 }
 
 Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
-  state_->NewIdUnlocked(round->replicate_msg()->mutable_id());
+  *round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
   RETURN_NOT_OK(state_->AddPendingOperation(round));
 
   Status s = queue_->AppendOperation(round->replicate_scoped_refptr());
@@ -556,23 +557,17 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
   // Handle Status::ServiceUnavailable(), which means the queue is full.
   if (PREDICT_FALSE(s.IsServiceUnavailable())) {
     gscoped_ptr<OpId> id(round->replicate_msg()->release_id());
-    // Rollback the id gen. so that we reuse this id later, when we can
-    // actually append to the state machine, i.e. this makes the state
-    // machine have continuous ids, for the same term, even if the queue
-    // refused to add any more operations.
+    // Cancel the operation that we started.
     state_->CancelPendingOperation(*id);
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << ": Could not append replicate request "
                  << "to the queue. Queue is Full. "
                  << "Queue metrics: " << queue_->ToString();
-
-    // TODO Possibly evict a dangling peer from the configuration here.
-    // TODO count of number of ops failed due to consensus queue overflow.
+    // TODO(todd) count of number of ops failed due to consensus queue overflow.
   } else if (PREDICT_FALSE(s.IsIOError())) {
     // This likely came from the log.
     LOG(FATAL) << "IO error appending to the queue: " << s.ToString();
   }
   RETURN_NOT_OK_PREPEND(s, "Unable to append operation to consensus queue");
-  state_->UpdateLastReceivedOpIdUnlocked(round->id());
   return Status::OK();
 }
 
@@ -754,7 +749,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
   // The leader's preceding id.
   deduplicated_req->preceding_opid = &rpc_req->preceding_id();
 
-  int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index();
+  int64_t dedup_up_to_index = queue_->GetLastOpIdInLog().index();
 
   deduplicated_req->first_message_idx = -1;
 
@@ -844,7 +839,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
   string error_msg = Substitute(
     "Log matching property violated."
     " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)",
-    state_->GetLastReceivedOpIdUnlocked().ShortDebugString(),
+    queue_->GetLastOpIdInLog().ShortDebugString(),
     req.preceding_opid->ShortDebugString(),
     term_mismatch ? "term" : "index");
 
@@ -875,10 +870,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
 
 void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
   state_->AbortOpsAfterUnlocked(truncate_after_index);
-  // Above resets the 'last received' to the operation with index 'truncate_after_index'.
-  OpId new_last_received = state_->GetLastReceivedOpIdUnlocked();
-  DCHECK_EQ(truncate_after_index, new_last_received.index());
-  queue_->TruncateOpsAfter(new_last_received);
+  queue_->TruncateOpsAfter(truncate_after_index);
 }
 
 Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
@@ -1215,16 +1207,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to));
     queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());
 
-    // We can now update the last received watermark.
-    //
-    // We do it here (and before we actually hear back from the wal whether things
-    // are durable) so that, if we receive another, possible duplicate, message
-    // that exercises this path we don't handle these messages twice.
-    //
     // If any messages failed to be started locally, then we already have removed them
-    // from 'deduped_req' at this point. So, we can simply update our last-received
-    // watermark to the last message that remains in 'deduped_req'.
-    state_->UpdateLastReceivedOpIdUnlocked(last_from_leader);
+    // from 'deduped_req' at this point. So, 'last_from_leader' is the last one that
+    // we might apply.
+    last_received_cur_leader_ = last_from_leader;
 
     // Fill the response with the current state. We will not mutate anymore state until
     // we actually reply to the leader, we'll just wait for the messages to be durable.
@@ -1269,12 +1255,11 @@ void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* respons
   TRACE("Filling consensus response to leader.");
   response->set_responder_term(state_->GetCurrentTermUnlocked());
   response->mutable_status()->mutable_last_received()->CopyFrom(
-      state_->GetLastReceivedOpIdUnlocked());
+      queue_->GetLastOpIdInLog());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
-      state_->GetLastReceivedOpIdCurLeaderUnlocked());
-  // TODO: interrogate queue rather than state?
+      last_received_cur_leader_);
   response->mutable_status()->set_last_committed_idx(
-      state_->GetCommittedIndexUnlocked());
+      queue_->GetCommittedIndex());
 }
 
 void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
@@ -1879,7 +1864,7 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   ReplicaState::UniqueLock lock;
   RETURN_NOT_OK(state_->LockForRead(&lock));
   if (type == RECEIVED_OPID) {
-    *DCHECK_NOTNULL(id) = state_->GetLastReceivedOpIdUnlocked();
+    *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
     id->set_term(state_->GetTermWithLastCommittedOpUnlocked());
     id->set_index(state_->GetCommittedIndexUnlocked());
@@ -2038,6 +2023,7 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
   RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush));
   term_metric_->set_value(new_term);
+  last_received_cur_leader_ = MinimumOpId();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index ec48edb..887201b 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -459,9 +459,14 @@ class RaftConsensus : public Consensus,
   // nodes from disturbing the healthy leader.
   MonoTime withhold_votes_until_;
 
+  // The last OpId received from the current leader. This is updated whenever the follower
+  // accepts operations from a leader, and passed back so that the leader knows from what
+  // point to continue sending operations.
+  OpId last_received_cur_leader_;
+
   const Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
-  // TODO hack to serialize updates due to repeated/out-of-order messages
+  // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
   // should probably be refactored out.
   //
   // Lock ordering note: If both this lock and the ReplicaState lock are to be

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 04e894a..9d63266 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -285,14 +285,9 @@ class RaftConsensusQuorumTest : public KuduTest {
   void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) {
     scoped_refptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
-    ReplicaState* state = peer->GetReplicaStateForTests();
     while (true) {
-      {
-        ReplicaState::UniqueLock lock;
-        CHECK_OK(state->LockForRead(&lock));
-        if (OpIdCompare(state->GetLastReceivedOpIdUnlocked(), to_wait_for) >= 0) {
-          return;
-        }
+      if (OpIdCompare(peer->queue_->GetLastOpIdInLog(), to_wait_for) >= 0) {
+        return;
       }
       SleepFor(MonoDelta::FromMilliseconds(1));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index a212fad..f99fb28 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -48,10 +48,7 @@ ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid,
     : options_(std::move(options)),
       peer_uuid_(std::move(peer_uuid)),
       cmeta_(std::move(cmeta)),
-      next_index_(0),
       txn_factory_(txn_factory),
-      last_received_op_id_(MinimumOpId()),
-      last_received_op_id_current_leader_(MinimumOpId()),
       last_committed_op_id_(MinimumOpId()),
       state_(kInitialized) {
   CHECK(cmeta_) << "ConsensusMeta passed as NULL";
@@ -71,9 +68,6 @@ Status ReplicaState::StartUnlocked(const OpId& last_id_in_wal) {
         GetCurrentTermUnlocked()));
   }
 
-  next_index_ = last_id_in_wal.index() + 1;
-  last_received_op_id_.CopyFrom(last_id_in_wal);
-
   state_ = kRunning;
   return Status::OK();
 }
@@ -266,13 +260,11 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch
     return true;
   }
 
-  if (op_id.index() > GetLastReceivedOpIdUnlocked().index()) {
+  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index());
+  if (!round) {
     return false;
   }
 
-  scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index());
-  DCHECK(round);
-
   if (round->id().term() != op_id.term()) {
     *term_mismatch = true;
     return false;
@@ -296,7 +288,6 @@ Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term,
     CHECK_OK(cmeta_->Flush());
   }
   ClearLeaderUnlocked();
-  last_received_op_id_current_leader_ = MinimumOpId();
   return Status::OK();
 }
 
@@ -405,12 +396,6 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) {
     new_preceding = last_committed_op_id_;
   }
 
-  // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
-  // here to avoid the bounds check, since we're breaking monotonicity.
-  last_received_op_id_ = new_preceding;
-  last_received_op_id_current_leader_ = last_received_op_id_;
-  next_index_ = new_preceding.index() + 1;
-
   for (; iter != pending_txns_.end();) {
     const scoped_refptr<ConsensusRound>& round = (*iter).second;
     auto op_type = round->replicate_msg()->op_type();
@@ -611,53 +596,19 @@ Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
   return Status::OK();
 }
 
-void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpId& op_id) {
-  DCHECK(update_lock_.is_locked());
-  if (OpIdCompare(op_id, last_received_op_id_) > 0) {
-    TRACE("Updating last received op as $0", OpIdToString(op_id));
-    last_received_op_id_ = op_id;
-    next_index_ = op_id.index() + 1;
-  }
-  last_received_op_id_current_leader_ = op_id;
-}
-
-const OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return last_received_op_id_;
-}
-
-const OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const {
-  DCHECK(update_lock_.is_locked());
-  return last_received_op_id_current_leader_;
-}
-
 OpId ReplicaState::GetLastPendingTransactionOpIdUnlocked() const {
   DCHECK(update_lock_.is_locked());
   return pending_txns_.empty()
       ? MinimumOpId() : (--pending_txns_.end())->second->id();
 }
 
-void ReplicaState::NewIdUnlocked(OpId* id) {
-  DCHECK(update_lock_.is_locked());
-  id->set_term(GetCurrentTermUnlocked());
-  id->set_index(next_index_++);
-}
 
 void ReplicaState::CancelPendingOperation(const OpId& id) {
   OpId previous = id;
   previous.set_index(previous.index() - 1);
   DCHECK(update_lock_.is_locked());
   CHECK_EQ(GetCurrentTermUnlocked(), id.term());
-  CHECK_EQ(next_index_, id.index() + 1);
-  next_index_ = id.index();
-
-  // We don't use UpdateLastReceivedOpIdUnlocked because we're actually
-  // updating it back to a lower value and we need to avoid the checks
-  // that method has.
 
-  // This is only ok if we do _not_ release the lock after calling
-  // NewIdUnlocked() (which we don't in RaftConsensus::Replicate()).
-  last_received_op_id_ = previous;
   scoped_refptr<ConsensusRound> round = EraseKeyReturnValuePtr(&pending_txns_, id.index());
   DCHECK(round);
 }
@@ -696,10 +647,9 @@ string ReplicaState::ToString() const {
 
 string ReplicaState::ToStringUnlocked() const {
   DCHECK(update_lock_.is_locked());
-  return Substitute("Replica: $0, State: $1, Role: $2, Watermarks: {Received: $3, Committed: $4}",
+  return Substitute("Replica: $0, State: $1, Role: $2, Last Committed: $3",
                     peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()),
-                    last_received_op_id_.ShortDebugString(),
-                    last_committed_op_id_.ShortDebugString());
+                    OpIdToString(last_committed_op_id_));
 }
 
 Status ReplicaState::CheckOpInSequence(const OpId& previous, const OpId& current) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index e27b7b3..d5d6fe2 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -269,19 +269,6 @@ class ReplicaState {
   // Returns OK iff an op from the current term has been committed.
   Status CheckHasCommittedOpInCurrentTermUnlocked() const;
 
-  // Updates the last received operation, if 'op_id''s index is higher than
-  // the previous last received. Also updates 'last_received_from_current_leader_'
-  // regardless of whether it is higher or lower than the prior value.
-  //
-  // This must be called under a lock.
-  void UpdateLastReceivedOpIdUnlocked(const OpId& op_id);
-
-  // Returns the last received op id. This must be called under the lock.
-  const OpId& GetLastReceivedOpIdUnlocked() const;
-
-  // Returns the id of the last op received from the current leader.
-  const OpId& GetLastReceivedOpIdCurLeaderUnlocked() const;
-
   // Returns the id of the latest pending transaction (i.e. the one with the
   // latest index). This must be called under the lock.
   OpId GetLastPendingTransactionOpIdUnlocked() const;
@@ -291,8 +278,6 @@ class ReplicaState {
   // to complete. This does not cancel transactions being applied.
   Status CancelPendingTransactions();
 
-  void NewIdUnlocked(OpId* id);
-
   // Used when, for some reason, an operation that failed before it could be considered
   // a part of the state machine. Basically restores the id gen to the state it was before
   // generating 'id'.
@@ -338,10 +323,6 @@ class ReplicaState {
   // Consensus metadata persistence object.
   std::unique_ptr<ConsensusMetadata> cmeta_;
 
-  // Used by the LEADER. This is the index of the next operation generated
-  // by this LEADER.
-  int64_t next_index_;
-
   // Index=>Round map that manages pending ops, i.e. operations for which we've
   // received a replicate message from the leader but have yet to be committed.
   // The key is the index of the replicate operation.
@@ -351,22 +332,6 @@ class ReplicaState {
   // this factory to start it.
   ReplicaTransactionFactory* txn_factory_;
 
-  // The id of the last received operation, which corresponds to the last entry
-  // written to the local log. Operations whose id is lower than or equal to
-  // this id do not need to be resent by the leader. This is not guaranteed to
-  // be monotonically increasing due to the possibility for log truncation and
-  // aborted operations when a leader change occurs.
-  OpId last_received_op_id_;
-
-  // Same as last_received_op_id_ but only includes operations sent by the
-  // current leader. The "term" in this op may not actually match the current
-  // term, since leaders may replicate ops from prior terms.
-  //
-  // As an implementation detail, this field is reset to MinumumOpId() every
-  // time there is a term advancement on the local node, to simplify the logic
-  // involved in resetting this every time a new node becomes leader.
-  OpId last_received_op_id_current_leader_;
-
   // The OpId of the Apply that was last triggered when the last message from the leader
   // was received. Initialized to MinimumOpId().
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 1bbef3c..5d94a9b 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -897,10 +897,9 @@ TEST_F(RaftConsensusITest, TestChurnyElections) {
 }
 
 // The same test, except inject artificial latency when propagating notifications
-// from the queue back to consensus. This can reproduce bugs like KUDU-1078 which
-// normally only appear under high load. TODO: Re-enable once we get to the
-// bottom of KUDU-1078.
-TEST_F(RaftConsensusITest, DISABLED_TestChurnyElections_WithNotificationLatency) {
+// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
+// normally only appear under high load.
+TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
   DoTestChurnyElections(WITH_NOTIFICATION_LATENCY);
 }
 


Mime
View raw message