kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 01/02: KUDU-2612 p7: add transaction participants to tablets
Date Thu, 13 Aug 2020 23:56:44 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 14d4117afa42d0ddf75bfda1926bb2c47cc6e322
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Fri Jul 17 15:31:48 2020 -0700

    KUDU-2612 p7: add transaction participants to tablets
    
    This patch adds state tracking on transaction participants (i.e.
    tablets that are participating in a transaction) in the form of a map
    from transaction ID to transaction state.
    
    Each transaction that a tablet participates in has associated with it a
    state machine that will be controlled by the transaction status manager
    based on global transaction state maintained therein. Participant state
    tracking is plugged into the op driver framework. The details of this
    state machine are outlined in txn_participant.h.
    
    While this patch Raft commits participant ops using the existing op
    driver framework, it doesn't replay participant ops from the WAL or
    anchor WAL segments. As such, tests are only added to test the
    replication aspect of participant op drivers, rather than testing any
    form of durability.
    
    The added state can only be called via internal APIs. There are no RPC
    endpoints with which the code in this patch can be reached.
    
    Change-Id: I39201ce1d911308cd28f3f4790a126e30052f3bf
    Reviewed-on: http://gerrit.cloudera.org:8080/16277
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
---
 src/kudu/consensus/consensus.proto                 |   2 +
 src/kudu/integration-tests/CMakeLists.txt          |   1 +
 .../integration-tests/txn_participant-itest.cc     | 285 ++++++++++++++++++++
 src/kudu/tablet/CMakeLists.txt                     |   3 +
 src/kudu/tablet/ops/op.h                           |   1 +
 src/kudu/tablet/ops/op_tracker.cc                  |   6 +
 src/kudu/tablet/ops/participant_op.cc              | 192 ++++++++++++++
 src/kudu/tablet/ops/participant_op.h               | 111 ++++++++
 src/kudu/tablet/tablet.h                           |  10 +-
 src/kudu/tablet/tablet_replica.cc                  |  36 ++-
 src/kudu/tablet/tablet_replica.h                   |   9 +-
 src/kudu/tablet/txn_participant-test-util.h        |  63 +++++
 src/kudu/tablet/txn_participant-test.cc            | 291 +++++++++++++++++++++
 src/kudu/tablet/txn_participant.cc                 |  76 ++++++
 src/kudu/tablet/txn_participant.h                  | 230 ++++++++++++++++
 src/kudu/tserver/tserver_admin.proto               |  26 ++
 16 files changed, 1333 insertions(+), 9 deletions(-)

diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index faecd7f..611ef3f 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -94,6 +94,7 @@ enum OperationType {
   WRITE_OP = 3;
   ALTER_SCHEMA_OP = 4;
   CHANGE_CONFIG_OP = 5;
+  PARTICIPANT_OP = 6;
 }
 
 // The op driver type: indicates whether an op is being executed on a leader or
@@ -203,6 +204,7 @@ message ReplicateMsg {
   optional tserver.WriteRequestPB write_request = 5;
   optional tserver.AlterSchemaRequestPB alter_schema_request = 6;
   optional ChangeConfigRecordPB change_config_record = 7;
+  optional tserver.ParticipantRequestPB participant_request = 9;
 
   // The client's request id for this message, if it is set.
   optional rpc.RequestIdPB request_id = 8;
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index dbc7eff..d920ced 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -131,6 +131,7 @@ ADD_KUDU_TEST(tombstoned_voting-imc-itest)
 ADD_KUDU_TEST(tombstoned_voting-itest)
 ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(token_signer-itest)
+ADD_KUDU_TEST(txn_participant-itest)
 ADD_KUDU_TEST(txn_status_table-itest)
 ADD_KUDU_TEST(location_assignment-itest
   DATA_FILES ../scripts/assign-location.py)
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc
new file mode 100644
index 0000000..c4639cd
--- /dev/null
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_participant-test-util.h"
+#include "kudu/tablet/txn_participant.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(raft_enable_pre_election);
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(raft_heartbeat_interval_ms);
+
+using kudu::cluster::InternalMiniCluster;
+using kudu::cluster::InternalMiniClusterOptions;
+using kudu::tablet::kCommitSequence;
+using kudu::tablet::kDummyCommitTimestamp;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::Txn;
+using kudu::tablet::TxnParticipant;
+using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::ParticipantRequestPB;
+using kudu::tserver::ParticipantResponsePB;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace itest {
+
+namespace {
+vector<Status> RunOnReplicas(const vector<TabletReplica*>& replicas,
+                             int64_t txn_id,
+                             ParticipantOpPB::ParticipantOpType type) {
+  vector<Status> statuses(replicas.size(), Status::Incomplete(""));
+  vector<thread> threads;
+  for (int i = 0; i < replicas.size(); i++) {
+    threads.emplace_back([&, i] {
+      ParticipantResponsePB resp;
+      statuses[i] = CallParticipantOp(replicas[i], txn_id, type, kDummyCommitTimestamp, &resp);
+      if (resp.has_error()) {
+        DCHECK_OK(statuses[i]);
+        statuses[i] = StatusFromPB(resp.error().status());
+      }
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  return statuses;
+}
+} // anonymous namespace
+
+class TxnParticipantITest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 3;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+    NO_FATALS(SetUpTable());
+  }
+
+  // Creates a single-tablet replicated table.
+  void SetUpTable() {
+    TestWorkload w(cluster_.get());
+    w.Setup();
+    w.Start();
+    while (w.rows_inserted() < 1) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    w.StopAndJoin();
+  }
+
+ protected:
+  unique_ptr<InternalMiniCluster> cluster_;
+};
+
+// Test that participant ops only applied to followers via replication from
+// leaders.
+TEST_F(TxnParticipantITest, TestReplicateParticipantOps) {
+  // Keep track of all the participant replicas, and quiesce all but one
+  // tserver so we can ensure a specific leader.
+  const int kLeaderIdx = 0;
+  vector<TabletReplica*> replicas;
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    auto* ts = cluster_->mini_tablet_server(i);
+    const auto& tablets = ts->ListTablets();
+    ASSERT_EQ(1, tablets.size());
+    scoped_refptr<TabletReplica> r;
+    ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
+    replicas.emplace_back(r.get());
+    if (i != kLeaderIdx) {
+      *ts->server()->mutable_quiescing() = true;
+    }
+  }
+  ASSERT_OK(replicas[kLeaderIdx]->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
+  // Try submitting the ops on all replicas. They should succeed on the leaders
+  // and fail on followers.
+  const int64_t kTxnId = 1;
+  for (const auto& op : kCommitSequence) {
+    vector<Status> statuses = RunOnReplicas(replicas, kTxnId, op);
+    for (int i = 0; i < statuses.size(); i++) {
+      const auto& s = statuses[i];
+      if (i == kLeaderIdx) {
+        ASSERT_OK(s);
+      } else {
+        ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+        ASSERT_STR_CONTAINS(s.ToString(), "not leader of this config");
+      }
+    }
+  }
+
+  // Attempt to make calls on just the followers.
+  vector<TabletReplica*> followers;
+  for (int i = 0; i < replicas.size(); i++) {
+    if (i != kLeaderIdx) {
+      followers.emplace_back(replicas[i]);
+    }
+  }
+  // Try with a transaction ID we've already tried, and with a new one. Both
+  // should fail because we aren't leader.
+  for (int txn_id = kTxnId; txn_id <= kTxnId + 1; txn_id++) {
+    for (const auto& op : kCommitSequence) {
+      vector<Status> statuses = RunOnReplicas(followers, txn_id, op);
+      for (int i = 0; i < statuses.size(); i++) {
+        const auto& s = statuses[i];
+        ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+        ASSERT_STR_CONTAINS(s.ToString(), "not leader of this config");
+      }
+    }
+  }
+  // Now try just on the leader. This should succeed.
+  for (const auto& op : kCommitSequence) {
+    vector<Status> statuses = RunOnReplicas({ replicas[kLeaderIdx] }, kTxnId + 1, op);
+    ASSERT_EQ(1, statuses.size());
+    ASSERT_OK(statuses[0]);
+  }
+}
+
+class TxnParticipantElectionStormITest : public TxnParticipantITest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    // Make leader elections more frequent to get through this test a bit more
+    // quickly.
+    FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+    FLAGS_raft_heartbeat_interval_ms = 30;
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = 3;
+    cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
+    ASSERT_OK(cluster_->Start());
+    NO_FATALS(SetUpTable());
+  }
+};
+
+TEST_F(TxnParticipantElectionStormITest, TestFrequentElections) {
+  vector<TabletReplica*> replicas;
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    auto* ts = cluster_->mini_tablet_server(i);
+    const auto& tablets = ts->ListTablets();
+    scoped_refptr<TabletReplica> r;
+    ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
+    replicas.emplace_back(r.get());
+  }
+  // Inject latency so elections become more frequent and wait a bit for our
+  // latency injection to kick in.
+  FLAGS_raft_enable_pre_election = false;
+  FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 * FLAGS_raft_heartbeat_interval_ms;;
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2));
+
+  // Send a participant op to all replicas concurrently. Leaders should attempt
+  // to replicate, and followers should immediately reject the request. There
+  // will be a real chance that the leader op will not be successfully
+  // replicated because of an election change.
+  constexpr const int kNumTxns = 10;
+  vector<ParticipantOpPB::ParticipantOpType> last_successful_ops_per_txn(kNumTxns);
+  for (int txn_id = 0; txn_id < kNumTxns; txn_id++) {
+    ParticipantOpPB::ParticipantOpType last_successful_op = ParticipantOpPB::UNKNOWN;;
+    for (const auto& op : kCommitSequence) {
+      vector<thread> threads;
+      vector<Status> statuses(cluster_->num_tablet_servers(), Status::Incomplete(""));
+      for (int r = 0; r < cluster_->num_tablet_servers(); r++) {
+        threads.emplace_back([&, r] {
+          ParticipantResponsePB resp;
+          Status s = CallParticipantOp(replicas[r], txn_id, op, kDummyCommitTimestamp, &resp);
+          if (resp.has_error()) {
+            s = StatusFromPB(resp.error().status());
+          }
+          statuses[r] = s;
+        });
+      }
+      for (auto& t : threads) {
+        t.join();
+      }
+      // If this op succeeded on any replica, keep track of it -- it indicates
+      // what the final state of each transaction should be.
+      for (const auto& s : statuses) {
+        if (s.ok()) {
+          last_successful_op = op;
+        }
+      }
+      last_successful_ops_per_txn[txn_id] = last_successful_op;
+    }
+  }
+  // Validate that each replica has each transaction in the appropriate state.
+  vector<TxnParticipant::TxnEntry> expected_txns;
+  for (int txn_id = 0; txn_id < kNumTxns; txn_id++) {
+    const auto& last_successful_op = last_successful_ops_per_txn[txn_id];
+    if (last_successful_op == ParticipantOpPB::UNKNOWN) {
+      continue;
+    }
+    Txn::State expected_state;
+    switch (last_successful_op) {
+      case ParticipantOpPB::BEGIN_TXN:
+        expected_state = Txn::kOpen;
+        break;
+      case ParticipantOpPB::BEGIN_COMMIT:
+        expected_state = Txn::kCommitInProgress;
+        break;
+      case ParticipantOpPB::FINALIZE_COMMIT:
+        expected_state = Txn::kCommitted;
+        break;
+      default:
+        FAIL() << "Unexpected successful op " << last_successful_op;
+    }
+    expected_txns.emplace_back(TxnParticipant::TxnEntry({
+        txn_id, expected_state, expected_state == Txn::kCommitted ? kDummyCommitTimestamp : -1}));
+  }
+  for (int i = 0; i < replicas.size(); i++) {
+    // NOTE: We ASSERT_EVENTUALLY here because having completed the participant
+    // op only guarantees successful replication on a majority. We need to wait
+    // a bit for the state to fully quiesce.
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(expected_txns, replicas[i]->tablet()->txn_participant()->GetTxnsForTests());
+    });
+  }
+}
+
+} // namespace itest
+} // namespace kudu
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index 49aa362..a9a5867 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -22,10 +22,12 @@ set(TABLET_SRCS
   tablet_mm_ops.cc
   tablet_replica_mm_ops.cc
   tablet_replica.cc
+  txn_participant.cc
   ops/op.cc
   ops/alter_schema_op.cc
   ops/op_driver.cc
   ops/op_tracker.cc
+  ops/participant_op.cc
   ops/write_op.cc
   op_order_verifier.cc
   cfile_set.cc
@@ -125,6 +127,7 @@ ADD_KUDU_TEST(tablet_mm_ops-test)
 ADD_KUDU_TEST(tablet_random_access-test)
 ADD_KUDU_TEST(tablet_replica-test)
 ADD_KUDU_TEST(tablet_throttle-test)
+ADD_KUDU_TEST(txn_participant-test)
 
 # Some tests don't have dependencies on other tablet stuff
 SET_KUDU_TEST_LINK_LIBS(kudu_util gutil)
diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h
index 258dd05..0f973f8 100644
--- a/src/kudu/tablet/ops/op.h
+++ b/src/kudu/tablet/ops/op.h
@@ -77,6 +77,7 @@ class Op {
   enum OpType {
     WRITE_OP,
     ALTER_SCHEMA_OP,
+    PARTICIPANT_OP,
   };
 
   enum TraceType {
diff --git a/src/kudu/tablet/ops/op_tracker.cc b/src/kudu/tablet/ops/op_tracker.cc
index 98153a2..e0575e6 100644
--- a/src/kudu/tablet/ops/op_tracker.cc
+++ b/src/kudu/tablet/ops/op_tracker.cc
@@ -194,6 +194,9 @@ void OpTracker::IncrementCounters(const OpDriver& driver) const {
     case Op::ALTER_SCHEMA_OP:
       metrics_->alter_schema_transactions_inflight->Increment();
       break;
+    case Op::PARTICIPANT_OP:
+      // TODO(awong): implement me!
+      break;
   }
 }
 
@@ -213,6 +216,9 @@ void OpTracker::DecrementCounters(const OpDriver& driver) const {
       DCHECK_GT(metrics_->alter_schema_transactions_inflight->value(), 0);
       metrics_->alter_schema_transactions_inflight->Decrement();
       break;
+    case Op::PARTICIPANT_OP:
+      // TODO(awong): implement me!
+      break;
   }
 }
 
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
new file mode 100644
index 0000000..f84a833
--- /dev/null
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/ops/participant_op.h"
+
+#include <memory>
+
+#include <glog/logging.h>
+#include <google/protobuf/arena.h>
+
+#include "kudu/clock/hybrid_clock.h"
+#include "kudu/common/timestamp.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_participant.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/trace.h"
+
+using kudu::consensus::CommitMsg;
+using kudu::consensus::ReplicateMsg;
+using kudu::consensus::OperationType;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::tablet::TabletReplica;
+using kudu::tserver::ParticipantOpPB;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+class rw_semaphore;
+
+namespace tablet {
+
+ParticipantOpState::ParticipantOpState(TabletReplica* tablet_replica,
+                                       const tserver::ParticipantRequestPB* request,
+                                       tserver::ParticipantResponsePB* response)
+    : OpState(tablet_replica),
+      request_(DCHECK_NOTNULL(request)),
+      response_(response) {}
+
+void ParticipantOpState::AcquireTxnAndLock() {
+  int64_t txn_id = request_->op().txn_id();
+  txn_ = tablet_replica()->tablet()->txn_participant()->GetOrCreateTransaction(txn_id);
+  txn_->AcquireWriteLock(&txn_lock_);
+}
+
+void ParticipantOpState::ReleaseTxn() {
+  if (txn_lock_.owns_lock()) {
+    txn_lock_ = std::unique_lock<rw_semaphore>();
+  }
+  txn_.reset();
+  TRACE("Released txn lock");
+}
+
+string ParticipantOpState::ToString() const {
+  const string ts_str = has_timestamp() ? timestamp().ToString() : "<unassigned>";
+  DCHECK(request_);
+  return Substitute("ParticipantOpState $0 [op_id=($1), ts=$2, type=$3]",
+      this, SecureShortDebugString(op_id()), ts_str,
+      ParticipantOpPB::ParticipantOpType_Name(request_->op().type()));
+}
+
+void ParticipantOp::NewReplicateMsg(unique_ptr<ReplicateMsg>* replicate_msg) {
+  replicate_msg->reset(new ReplicateMsg);
+  (*replicate_msg)->set_op_type(OperationType::PARTICIPANT_OP);
+  (*replicate_msg)->mutable_participant_request()->CopyFrom(*state()->request());
+  if (state()->are_results_tracked()) {
+    (*replicate_msg)->mutable_request_id()->CopyFrom(state()->request_id());
+  }
+}
+
+Status ParticipantOp::Prepare() {
+  TRACE_EVENT0("op", "ParticipantOp::Prepare");
+  TRACE("PREPARE: Starting.");
+  state_->AcquireTxnAndLock();
+  const auto& op = state_->request()->op();
+  Txn* txn = state_->txn_.get();
+  DCHECK(txn);
+  switch (op.type()) {
+    case ParticipantOpPB::BEGIN_TXN: {
+      RETURN_NOT_OK(txn->ValidateBeginTransaction());
+      break;
+    }
+    case ParticipantOpPB::BEGIN_COMMIT: {
+      RETURN_NOT_OK(txn->ValidateBeginCommit());
+      break;
+    }
+    case ParticipantOpPB::FINALIZE_COMMIT: {
+      RETURN_NOT_OK(txn->ValidateFinalize());
+      break;
+    }
+    case ParticipantOpPB::ABORT_TXN: {
+      RETURN_NOT_OK(txn->ValidateAbort());
+      break;
+    }
+    case ParticipantOpPB::UNKNOWN: {
+      return Status::InvalidArgument("unknown op type");
+    }
+  }
+  TRACE("PREPARE: Finished.");
+  return Status::OK();
+}
+
+Status ParticipantOp::Start() {
+  DCHECK(!state_->has_timestamp());
+  DCHECK(state_->consensus_round()->replicate_msg()->has_timestamp());
+  state_->set_timestamp(Timestamp(state_->consensus_round()->replicate_msg()->timestamp()));
+  TRACE("START. Timestamp: $0", clock::HybridClock::GetPhysicalValueMicros(state_->timestamp()));
+  return Status::OK();
+}
+
+Status ParticipantOp::Apply(CommitMsg** commit_msg) {
+  TRACE_EVENT0("op", "ParticipantOp::Apply");
+  TRACE("APPLY: Starting.");
+  const auto& op = state_->request()->op();
+  Txn* txn = state_->txn_.get();
+  Status s;
+  switch (op.type()) {
+    // NOTE: these can currently never fail because we are only updating
+    // metadata. When we begin validating write ops before committing, we'll
+    // need to populate the response with errors.
+    case ParticipantOpPB::BEGIN_TXN: {
+      txn->BeginTransaction();
+      break;
+    }
+    case ParticipantOpPB::BEGIN_COMMIT: {
+      txn->BeginCommit();
+      break;
+    }
+    case ParticipantOpPB::FINALIZE_COMMIT: {
+      txn->FinalizeCommit(op.finalized_commit_timestamp());
+      break;
+    }
+    case ParticipantOpPB::ABORT_TXN: {
+      txn->AbortTransaction();
+      break;
+    }
+    case ParticipantOpPB::UNKNOWN: {
+      return Status::InvalidArgument("unknown op type");
+    }
+  }
+  *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena());
+  (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP);
+  TRACE("APPLY: Finished.");
+  return Status::OK();
+}
+
+void ParticipantOp::Finish(OpResult result) {
+  auto txn_id = state_->request()->op().txn_id();
+  state_->ReleaseTxn();
+  TxnParticipant* txn_participant = state_->tablet_replica()->tablet()->txn_participant();
+  if (PREDICT_FALSE(result == Op::ABORTED)) {
+    txn_participant->ClearIfInitFailed(txn_id);
+    TRACE("FINISH: Op aborted");
+    return;
+  }
+  DCHECK_EQ(result, Op::COMMITTED);
+  // TODO(awong): when implementing transaction cleanup on participants, clean
+  // up finalized and aborted transactions here.
+  TRACE("FINISH: Op committed");
+}
+
+string ParticipantOp::ToString() const {
+  return Substitute("ParticipantOp [type=$0, state=$1]",
+      DriverType_Name(type()), state_->ToString());
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/ops/participant_op.h b/src/kudu/tablet/ops/participant_op.h
new file mode 100644
index 0000000..12b8392
--- /dev/null
+++ b/src/kudu/tablet/ops/participant_op.h
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/txn_participant.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class rw_semaphore;
+
+namespace tablet {
+class TabletReplica;
+
+// An OpState for an update to transaction participant state.
+class ParticipantOpState : public OpState {
+ public:
+  // Creates op state for the given tablet replica with the given request and
+  // response.
+  // TODO(awong): track this on the RPC results tracker.
+  ParticipantOpState(TabletReplica* tablet_replica,
+                     const tserver::ParticipantRequestPB* request,
+                     tserver::ParticipantResponsePB* response = nullptr);
+  const tserver::ParticipantRequestPB* request() const override {
+    return request_;
+  }
+  tserver::ParticipantResponsePB* response() const override {
+    return response_;
+  }
+  std::string ToString() const override;
+
+  // Takes a reference to the transaction associated with this request in the
+  // underlying tablet's transaction participant, creating the transaction if
+  // it doesn't already exist. Locks the transaction for writes.
+  void AcquireTxnAndLock();
+
+  // Releases the transaction and its lock.
+  void ReleaseTxn();
+
+  // Returns the transaction ID for this op.
+  int64_t txn_id() {
+    return request_->op().txn_id();
+  }
+ private:
+  friend class ParticipantOp;
+  const tserver::ParticipantRequestPB* request_;
+  tserver::ParticipantResponsePB* response_;
+
+  scoped_refptr<Txn> txn_;
+  std::unique_lock<rw_semaphore> txn_lock_;
+};
+
+// Op that executes a transaction state change in the transaction participant.
+class ParticipantOp : public Op {
+ public:
+  ParticipantOp(std::unique_ptr<ParticipantOpState> state,
+                consensus::DriverType type)
+      : Op(type, Op::PARTICIPANT_OP),
+        state_(std::move(state)) {}
+  OpState* state() override { return state_.get(); }
+  const OpState* state() const override { return state_.get(); }
+  void NewReplicateMsg(std::unique_ptr<consensus::ReplicateMsg>* replicate_msg) override;
+
+  // Takes a reference to the requested transaction, creating it if necessary.
+  // Locks its state and checks that the requested operation is valid.
+  Status Prepare() override;
+
+  // Register the op.
+  Status Start() override;
+
+  // Perform the state change.
+  Status Apply(consensus::CommitMsg** commit_msg) override;
+
+  // Release the transaction reference and the lock on its state. If this was
+  // the only op referencing the transaction and it was left in the
+  // kInitializing state (e.g. we tried to start the transaction in this op but
+  // aborted before applying), removes the transaction from those tracked by
+  // the underlying TxnParticipant.
+  void Finish(OpResult result) override;
+
+  std::string ToString() const override;
+
+ private:
+  std::unique_ptr<ParticipantOpState> state_;
+};
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index ab220bb..e4ef3b9 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -43,6 +43,7 @@
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet_mem_trackers.h"
 #include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/txn_participant.h"
 #include "kudu/util/bloom_filter.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/maintenance_manager.h"
@@ -141,7 +142,7 @@ class Tablet {
   Status DecodeWriteOperations(const Schema* client_schema,
                                WriteOpState* op_state);
 
-  // Acquire locks for each of the operations in the given txn.
+  // Acquire locks for each of the operations in the given op.
   // This also sets the row op's RowSetKeyProbe.
   Status AcquireRowLocks(WriteOpState* op_state);
 
@@ -369,9 +370,12 @@ class Tablet {
   // Return the MVCC manager for this tablet.
   MvccManager* mvcc_manager() { return &mvcc_; }
 
-  // Return the Lock Manager for this tablet
+  // Return the Lock Manager for this tablet.
   LockManager* lock_manager() { return &lock_manager_; }
 
+  // Return the transaction participant for this tablet.
+  TxnParticipant* txn_participant() { return &txn_participant_; }
+
   const TabletMetadata *metadata() const { return metadata_.get(); }
   TabletMetadata *metadata() { return metadata_.get(); }
   scoped_refptr<TabletMetadata> shared_metadata() const { return metadata_; }
@@ -694,6 +698,8 @@ class Tablet {
   // released after the schema change has been applied.
   mutable rw_semaphore schema_lock_;
 
+  TxnParticipant txn_participant_;
+
   const Schema key_schema_;
 
   scoped_refptr<TabletMetadata> metadata_;
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 63b9a22..7052d24 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -48,6 +48,7 @@
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/ops/alter_schema_op.h"
 #include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/participant_op.h"
 #include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/tablet_replica_mm_ops.h"
@@ -115,6 +116,7 @@ using consensus::RpcPeerProxyFactory;
 using consensus::ServerContext;
 using consensus::TimeManager;
 using consensus::ALTER_SCHEMA_OP;
+using consensus::PARTICIPANT_OP;
 using consensus::WRITE_OP;
 using log::Log;
 using log::LogAnchorRegistry;
@@ -433,11 +435,21 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
   return Status::OK();
 }
 
-Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> state) {
+Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> op_state) {
   RETURN_NOT_OK(CheckRunning());
 
-  state->SetResultTracker(result_tracker_);
-  unique_ptr<WriteOp> op(new WriteOp(std::move(state), consensus::LEADER));
+  op_state->SetResultTracker(result_tracker_);
+  unique_ptr<WriteOp> op(new WriteOp(std::move(op_state), consensus::LEADER));
+  scoped_refptr<OpDriver> driver;
+  RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver));
+  return driver->ExecuteAsync();
+}
+
+Status TabletReplica::SubmitTxnParticipantOp(std::unique_ptr<ParticipantOpState> op_state) {
+  RETURN_NOT_OK(CheckRunning());
+
+  op_state->SetResultTracker(result_tracker_);
+  unique_ptr<ParticipantOp> op(new ParticipantOp(std::move(op_state), consensus::LEADER));
   scoped_refptr<OpDriver> driver;
   RETURN_NOT_OK(NewLeaderOpDriver(std::move(op), &driver));
   return driver->ExecuteAsync();
@@ -534,7 +546,7 @@ string TabletReplica::HumanReadableState() const {
 }
 
 void TabletReplica::GetInFlightOps(Op::TraceType trace_type,
-                                            vector<consensus::OpStatusPB>* out) const {
+                                   vector<consensus::OpStatusPB>* out) const {
   vector<scoped_refptr<OpDriver> > pending_ops;
   op_tracker_.GetPendingOps(&pending_ops);
   for (const scoped_refptr<OpDriver>& driver : pending_ops) {
@@ -548,6 +560,9 @@ void TabletReplica::GetInFlightOps(Op::TraceType trace_type,
         case Op::ALTER_SCHEMA_OP:
           status_pb.set_op_type(consensus::ALTER_SCHEMA_OP);
           break;
+        case Op::PARTICIPANT_OP:
+          status_pb.set_op_type(consensus::PARTICIPANT_OP);
+          break;
       }
       status_pb.set_description(driver->ToString());
       int64_t running_for_micros =
@@ -637,10 +652,21 @@ Status TabletReplica::StartFollowerOp(const scoped_refptr<ConsensusRound>& round
               &replicate_msg->write_request(),
               replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
       op_state->SetResultTracker(result_tracker_);
-
       op.reset(new WriteOp(std::move(op_state), consensus::REPLICA));
       break;
     }
+    case PARTICIPANT_OP:
+    {
+      DCHECK(replicate_msg->has_participant_request()) << "PARTICIPANT_OP replica"
+          " op must receive an ParticipantRequestPB";
+      unique_ptr<ParticipantOpState> op_state(
+          new ParticipantOpState(
+              this,
+              &replicate_msg->participant_request()));
+      op_state->SetResultTracker(result_tracker_);
+      op.reset(new ParticipantOp(std::move(op_state), consensus::REPLICA));
+      break;
+    }
     case ALTER_SCHEMA_OP:
     {
       DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica"
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 35bcd2d..9d643a3 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -73,6 +73,7 @@ class ResultTracker;
 namespace tablet {
 class AlterSchemaOpState;
 class OpDriver;
+class ParticipantOpState;
 class TabletStatusPB;
 class TxnCoordinator;
 class TxnCoordinatorFactory;
@@ -132,11 +133,15 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   Status WaitUntilConsensusRunning(const MonoDelta& timeout);
 
   // Submits a write to a tablet and executes it asynchronously.
-  // The caller is expected to build and pass a OpContext that points
-  // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
+  // The caller is expected to build and pass a WriteOpState that points to the
+  // RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
   // MvccManager.
   Status SubmitWrite(std::unique_ptr<WriteOpState> op_state);
 
+  // Submits an op to update transaction participant state, executing it
+  // asynchonously.
+  Status SubmitTxnParticipantOp(std::unique_ptr<ParticipantOpState> op_state);
+
   // Called by the tablet service to start an alter schema op.
   //
   // The op contains all the information required to execute the
diff --git a/src/kudu/tablet/txn_participant-test-util.h b/src/kudu/tablet/txn_participant-test-util.h
new file mode 100644
index 0000000..43268ef
--- /dev/null
+++ b/src/kudu/tablet/txn_participant-test-util.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <vector>
+
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/ops/participant_op.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace tablet {
+
+constexpr const int64_t kDummyCommitTimestamp = 1337;
+const std::vector<tserver::ParticipantOpPB::ParticipantOpType> kCommitSequence = {
+  tserver::ParticipantOpPB::BEGIN_TXN,
+  tserver::ParticipantOpPB::BEGIN_COMMIT,
+  tserver::ParticipantOpPB::FINALIZE_COMMIT,
+};
+
+Status CallParticipantOp(TabletReplica* replica,
+                         int64_t txn_id,
+                         tserver::ParticipantOpPB::ParticipantOpType type,
+                         int64_t finalized_commit_timestamp,
+                         tserver::ParticipantResponsePB* resp) {
+  tserver::ParticipantRequestPB req;
+  auto* op = req.mutable_op();
+  op->set_txn_id(txn_id);
+  op->set_type(type);
+  if (type == tserver::ParticipantOpPB::FINALIZE_COMMIT) {
+    op->set_finalized_commit_timestamp(finalized_commit_timestamp);
+  }
+  std::unique_ptr<ParticipantOpState> op_state(new ParticipantOpState(
+      replica,
+      &req,
+      resp));
+  CountDownLatch latch(1);
+  op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
+      new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, resp)));
+  RETURN_NOT_OK(replica->SubmitTxnParticipantOp(std::move(op_state)));
+  latch.Wait();
+  return Status::OK();
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
new file mode 100644
index 0000000..ca58f84
--- /dev/null
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/txn_participant.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica-test-base.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_participant-test-util.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using kudu::consensus::ConsensusBootstrapInfo;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::tserver::ParticipantRequestPB;
+using kudu::tserver::ParticipantResponsePB;
+using kudu::tserver::ParticipantOpPB;
+using std::map;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace tablet {
+
+class TxnParticipantTest : public TabletReplicaTestBase {
+ public:
+  TxnParticipantTest()
+      : TabletReplicaTestBase(Schema({ ColumnSchema("key", INT32) }, 1)) {}
+
+  void SetUp() override {
+    NO_FATALS(TabletReplicaTestBase::SetUp());
+    ConsensusBootstrapInfo info;
+    ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  }
+
+  const TxnParticipant* txn_participant() const {
+    return tablet_replica_->tablet()->txn_participant();
+  }
+};
+
+TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
+  const auto check_valid_sequence = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
+                                         int64_t txn_id) {
+    for (const auto& type : ops) {
+      ParticipantResponsePB resp;
+      ASSERT_OK(CallParticipantOp(
+          tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
+      SCOPED_TRACE(SecureShortDebugString(resp));
+      ASSERT_FALSE(resp.has_error());
+      ASSERT_TRUE(resp.has_timestamp());
+    }
+  };
+  // Check the happy path where the transaction is committed.
+  NO_FATALS(check_valid_sequence({
+      ParticipantOpPB::BEGIN_TXN,
+      ParticipantOpPB::BEGIN_COMMIT,
+      ParticipantOpPB::FINALIZE_COMMIT,
+  }, 0));
+
+  // Check the case where a transaction is aborted after beginning to commit.
+  NO_FATALS(check_valid_sequence({
+      ParticipantOpPB::BEGIN_TXN,
+      ParticipantOpPB::BEGIN_COMMIT,
+      ParticipantOpPB::ABORT_TXN,
+  }, 1));
+
+  // Check the case where a transaction is aborted after starting but before
+  // committing.
+  NO_FATALS(check_valid_sequence({
+      ParticipantOpPB::BEGIN_TXN,
+      ParticipantOpPB::ABORT_TXN,
+  }, 2));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { 0, Txn::kCommitted, kDummyCommitTimestamp },
+      { 1, Txn::kAborted, -1 },
+      { 2, Txn::kAborted, -1 },
+  }), txn_participant()->GetTxnsForTests());
+}
+
+TEST_F(TxnParticipantTest, TestTransactionNotFound) {
+  const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
+                                       int64_t txn_id) {
+    for (const auto& type : ops) {
+      ParticipantResponsePB resp;
+      ASSERT_OK(CallParticipantOp(
+          tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
+      SCOPED_TRACE(SecureShortDebugString(resp));
+      ASSERT_TRUE(resp.has_error());
+      ASSERT_TRUE(resp.error().has_status());
+      ASSERT_EQ(AppStatusPB::NOT_FOUND, resp.error().status().code());
+      ASSERT_FALSE(resp.has_timestamp());
+    }
+  };
+  NO_FATALS(check_bad_ops({
+    ParticipantOpPB::BEGIN_COMMIT,
+    ParticipantOpPB::FINALIZE_COMMIT,
+    ParticipantOpPB::ABORT_TXN,
+  }, 1));
+  ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+}
+
+TEST_F(TxnParticipantTest, TestIllegalTransitions) {
+  const int64_t kTxnId = 1;
+  const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType& type, int64_t txn_id) {
+    ParticipantResponsePB resp;
+    ASSERT_OK(CallParticipantOp(
+        tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
+    SCOPED_TRACE(SecureShortDebugString(resp));
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_TRUE(resp.has_timestamp());
+  };
+  const auto check_bad_ops = [&] (const vector<ParticipantOpPB::ParticipantOpType>& ops,
+                                       int64_t txn_id) {
+    for (const auto& type : ops) {
+      ParticipantResponsePB resp;
+      ASSERT_OK(CallParticipantOp(
+          tablet_replica_.get(), txn_id, type, kDummyCommitTimestamp, &resp));
+      SCOPED_TRACE(SecureShortDebugString(resp));
+      ASSERT_TRUE(resp.has_error());
+      ASSERT_TRUE(resp.error().has_status());
+      ASSERT_EQ(AppStatusPB::ILLEGAL_STATE, resp.error().status().code());
+      ASSERT_FALSE(resp.has_timestamp());
+    }
+  };
+  // Once we've begun the transaction, we can't finalize without beginning to
+  // commit.
+  NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kTxnId));
+  NO_FATALS(check_bad_ops({ ParticipantOpPB::FINALIZE_COMMIT }, kTxnId));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kOpen, -1 },
+  }), txn_participant()->GetTxnsForTests());
+
+  // Once we begin committing, we can't start the transaction again.
+  NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_COMMIT, kTxnId));
+  NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN }, kTxnId));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitInProgress, -1 },
+  }), txn_participant()->GetTxnsForTests());
+
+  // Once we've begun finalizing, we can't do anything.
+  NO_FATALS(check_valid_op(ParticipantOpPB::FINALIZE_COMMIT, kTxnId));
+  NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN,
+                            ParticipantOpPB::BEGIN_COMMIT,
+                            ParticipantOpPB::ABORT_TXN }, kTxnId));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
+  }), txn_participant()->GetTxnsForTests());
+
+  // Once we've aborted, we can't do anything.
+  const int64_t kAbortedTxnId = 2;
+  NO_FATALS(check_valid_op(ParticipantOpPB::BEGIN_TXN, kAbortedTxnId));
+  NO_FATALS(check_valid_op(ParticipantOpPB::ABORT_TXN, kAbortedTxnId));
+  NO_FATALS(check_bad_ops({ ParticipantOpPB::BEGIN_TXN,
+                            ParticipantOpPB::BEGIN_COMMIT,
+                            ParticipantOpPB::FINALIZE_COMMIT }, kAbortedTxnId));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
+      { kAbortedTxnId, Txn::kAborted, -1 },
+  }), txn_participant()->GetTxnsForTests());
+}
+
+// Test that we have no trouble operating on separate transactions.
+TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
+  const int kNumTxns = 10;
+  vector<thread> threads;
+  Status statuses[kNumTxns];
+  for (int i = 0; i < kNumTxns; i++) {
+    threads.emplace_back([&, i] {
+      for (const auto& type : kCommitSequence) {
+        ParticipantResponsePB resp;
+        Status s = CallParticipantOp(
+            tablet_replica_.get(), i, type, kDummyCommitTimestamp, &resp);
+        if (s.ok() && resp.has_error()) {
+          s = StatusFromPB(resp.error().status());
+        }
+        statuses[i] = s;
+      }
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
+  for (const auto& s : statuses) {
+    EXPECT_OK(s);
+  }
+  const auto& txns = txn_participant()->GetTxnsForTests();
+  for (int i = 0; i < kNumTxns; i++) {
+    ASSERT_EQ(TxnParticipant::TxnEntry({ i, Txn::kCommitted, kDummyCommitTimestamp }), txns[i]);
+  }
+}
+
+// Concurrently try to apply every op and test, based on the results, that some
+// invariants are maintained.
+TEST_F(TxnParticipantTest, TestConcurrentOps) {
+  const int64_t kTxnId = 1;
+  const map<ParticipantOpPB::ParticipantOpType, int> kIndexByOps = {
+    { ParticipantOpPB::BEGIN_TXN, 0 },
+    { ParticipantOpPB::BEGIN_COMMIT, 1},
+    { ParticipantOpPB::FINALIZE_COMMIT, 2},
+    { ParticipantOpPB::ABORT_TXN, 3},
+  };
+  vector<thread> threads;
+  vector<Status> statuses(kIndexByOps.size(), Status::Incomplete(""));
+  for (const auto& op_and_idx : kIndexByOps) {
+    const auto& op_type = op_and_idx.first;
+    const auto& idx = op_and_idx.second;
+    threads.emplace_back([&, op_type, idx] {
+      ParticipantResponsePB resp;
+      Status s = CallParticipantOp(
+          tablet_replica_.get(), kTxnId, op_type, kDummyCommitTimestamp, &resp);
+      if (s.ok() && resp.has_error()) {
+         s = StatusFromPB(resp.error().status());
+      }
+      statuses[idx] = s;
+    });
+  }
+  std::for_each(threads.begin(), threads.end(), [] (thread& t) { t.join(); });
+  const auto status_for_op = [&] (ParticipantOpPB::ParticipantOpType type) {
+    return statuses[FindOrDie(kIndexByOps, type)];
+  };
+  // Regardless of order, we should have been able to begin the transaction.
+  ASSERT_OK(status_for_op(ParticipantOpPB::BEGIN_TXN));
+
+  // If we finalized the commit, we should have begun committing, and we must
+  // not have been able to abort.
+  if (status_for_op(ParticipantOpPB::FINALIZE_COMMIT).ok()) {
+    ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+        { kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
+    }), txn_participant()->GetTxnsForTests());
+    ASSERT_OK(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::BEGIN_COMMIT)]);
+    ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::ABORT_TXN)].ok());
+
+  // If we aborted the commit, we could not have finalized the commit.
+  } else if (status_for_op(ParticipantOpPB::ABORT_TXN).ok()) {
+    ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+        { kTxnId, Txn::kAborted, -1 },
+    }), txn_participant()->GetTxnsForTests());
+    ASSERT_FALSE(statuses[FindOrDie(kIndexByOps, ParticipantOpPB::FINALIZE_COMMIT)].ok());
+
+  // If we neither aborted nor finalized, but we began to commit, we should be
+  // left with the commit in progress.
+  } else if (status_for_op(ParticipantOpPB::BEGIN_COMMIT).ok()) {
+    ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+        { kTxnId, Txn::kCommitInProgress, -1 },
+    }), txn_participant()->GetTxnsForTests());
+
+  // Finally, if nothing else succeeded, at least we should have been able to
+  // start the transaction.
+  } else {
+    ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+        { kTxnId, Txn::kOpen, -1 },
+    }), txn_participant()->GetTxnsForTests());
+  }
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
new file mode 100644
index 0000000..a3ef237
--- /dev/null
+++ b/src/kudu/tablet/txn_participant.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/txn_participant.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tablet {
+
+void Txn::AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock) {
+  std::unique_lock<rw_semaphore> l(state_lock_);
+  *txn_lock = std::move(l);
+}
+
+scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id) {
+  // TODO(awong): add a 'user' field to these transactions.
+  std::lock_guard<simple_spinlock> l(lock_);
+  return LookupOrInsertNewSharedPtr(&txns_, txn_id);
+}
+
+void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  Txn* txn = FindPointeeOrNull(txns_, txn_id);
+  // NOTE: If this is the only reference to the transaction, we can forego
+  // locking the state.
+  if (txn && txn->HasOneRef() && txn->state() == Txn::kInitializing) {
+    txns_.erase(txn_id);
+  }
+}
+
+vector<TxnParticipant::TxnEntry> TxnParticipant::GetTxnsForTests() const {
+  vector<TxnEntry> txns;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (const auto& txn_id_and_scoped_txn : txns_) {
+      const auto& scoped_txn = txn_id_and_scoped_txn.second;
+      txns.emplace_back(TxnEntry{
+        txn_id_and_scoped_txn.first,
+        scoped_txn->state(),
+        scoped_txn->commit_timestamp(),
+      });
+    }
+  }
+  std::sort(txns.begin(), txns.end(),
+      [] (const TxnEntry& a, const TxnEntry& b) { return a.txn_id < b.txn_id; });
+  return txns;
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
new file mode 100644
index 0000000..c2c91c9
--- /dev/null
+++ b/src/kudu/tablet/txn_participant.h
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <atomic>
+#include <cstdint>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/rw_semaphore.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace tablet {
+
+// Tracks the state associated with a transaction.
+//
+// This class will primarily be accessed via op drivers. As such, locking
+// primitives are exposed publicly, to be called in different stages of
+// replication.
+class Txn : public RefCountedThreadSafe<Txn> {
+ public:
+  enum State {
+    // Each transaction starts in this state. While in this state, the
+    // transaction is not yet ready to be used, e.g. the initial op to begin
+    // the transaction may not have successfully replicated yet.
+    kInitializing,
+
+    // Each transaction is moved into this state once they are ready to begin
+    // accepting ops.
+    kOpen,
+
+    // A transaction is moved into this state when a client has signified the
+    // intent to begin committing it. While in this state, the transaction may
+    // not accept new ops.
+    kCommitInProgress,
+
+    // A transaction is moved into this state when it becomes finalized -- all
+    // participants have acknowledged the intent to commit and have guaranteed
+    // that all ops in the transaction will succeed. While in this state, the
+    // transaction may not accept new ops and may not be aborted.
+    kCommitted,
+
+    // A transaction is moved into this state when a client has signified
+    // intent to cancel the transaction. While in this state, the transaction
+    // may not accept new ops, begin committing, or finalize a commit.
+    kAborted,
+  };
+  static const char* StateToString(State s) {
+    switch (s) {
+      case kInitializing: return "INITIALIZING";
+      case kOpen: return "OPEN";
+      case kCommitInProgress: return "COMMIT_IN_PROGRESS";
+      case kCommitted: return "COMMITTED";
+      case kAborted: return "ABORTED";
+    }
+    __builtin_unreachable();
+  }
+
+  Txn() : state_(kInitializing), commit_timestamp_(-1) {}
+
+  // Takes the state lock in write mode and returns it. As transaction state is
+  // meant to be driven via an op driver, lock acquisition is expected to be
+  // serialized in a single thread.
+  void AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock);
+
+  // Validates that the transaction is in the appropriate state to perform the
+  // given operation. Should be called while holding the state lock before
+  // replicating a participant op.
+  Status ValidateBeginTransaction() const {
+    DCHECK(state_lock_.is_locked());
+    if (PREDICT_FALSE(state_ != kInitializing)) {
+      return Status::IllegalState(
+          strings::Substitute("Cannot begin transaction in state: $0",
+                              StateToString(state_)));
+    }
+    return Status::OK();
+  }
+  Status ValidateBeginCommit() const {
+    DCHECK(state_lock_.is_locked());
+    RETURN_NOT_OK(CheckFinishedInitializing());
+    if (PREDICT_FALSE(state_ != kOpen)) {
+      return Status::IllegalState(
+          strings::Substitute("Cannot begin committing transaction in state: $0",
+                              StateToString(state_)));
+    }
+    return Status::OK();
+  }
+  Status ValidateFinalize() const {
+    DCHECK(state_lock_.is_locked());
+    RETURN_NOT_OK(CheckFinishedInitializing());
+    if (PREDICT_FALSE(state_ != kCommitInProgress)) {
+      return Status::IllegalState(
+          strings::Substitute("Cannot finalize transaction in state: $0",
+                              StateToString(state_)));
+    }
+    return Status::OK();
+  }
+  Status ValidateAbort() const {
+    DCHECK(state_lock_.is_locked());
+    RETURN_NOT_OK(CheckFinishedInitializing());
+    if (PREDICT_FALSE(state_ != kOpen &&
+                      state_ != kCommitInProgress)) {
+      return Status::IllegalState(
+          strings::Substitute("Cannot abort transaction in state: $0",
+                              StateToString(state_)));
+    }
+    return Status::OK();
+  }
+
+  // Applies the given state transitions. Should be called while holding the
+  // state lock in write mode after successfully replicating a participant op.
+  void BeginTransaction() {
+    SetState(kOpen);
+  }
+  void BeginCommit() {
+    SetState(kCommitInProgress);
+  }
+  void FinalizeCommit(int64_t finalized_commit_timestamp) {
+    SetState(kCommitted);
+    commit_timestamp_ = finalized_commit_timestamp;
+  }
+  void AbortTransaction() {
+    SetState(kAborted);
+  }
+
+  // Simple accessors for state. No locks are required to call these.
+  State state() const {
+    return state_;
+  }
+  int64_t commit_timestamp() const {
+    return commit_timestamp_;
+  }
+
+ private:
+  friend class RefCountedThreadSafe<Txn>;
+
+  // Sets the transaction state.
+  void SetState(State s) {
+    DCHECK(state_lock_.is_write_locked());
+    state_ = s;
+  }
+
+  // Returns an error if the transaction has not finished initializing.
+  Status CheckFinishedInitializing() const {
+    if (PREDICT_FALSE(state_ == kInitializing)) {
+      return Status::NotFound("Transaction hasn't been successfully started");
+    }
+    return Status::OK();
+  }
+
+  // Lock protecting access to 'state_' and 'commit_timestamp'. Ops that intend
+  // on mutating 'state_' must take this lock in write mode. Ops that intend on
+  // reading 'state_' and relying on it remaining constant must take this lock
+  // in read mode.
+  mutable rw_semaphore state_lock_;
+  std::atomic<State> state_;
+
+  // If this transaction was successfully committed, the timestamp at which the
+  // transaction should be applied, and -1 otherwise.
+  std::atomic<int64_t> commit_timestamp_;
+
+  DISALLOW_COPY_AND_ASSIGN(Txn);
+};
+
+// Tracks the on-going transactions in which a given tablet is participating.
+class TxnParticipant {
+ public:
+  // Convenience struct representing a Txn of this participant. This is useful
+  // for testing, as it easy to construct.
+  struct TxnEntry {
+    int64_t txn_id;
+    Txn::State state;
+    int64_t commit_timestamp;
+  };
+
+  // Gets the transaction state for the given transaction ID, creating it in
+  // the kInitializing state if one doesn't already exist.
+  scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id);
+
+  // Removes the given transaction if it failed to initialize, e.g. the op that
+  // created it failed to replicate, leaving it in the kInitializing state but
+  // with no op actively mutating it.
+  //
+  // It is expected that the caller, e.g. a ParticipantOp, has released any Txn
+  // references before calling this, ensuring that when we check the state of
+  // the Txn, we can thread-safely determine whether it has been abandoned.
+  void ClearIfInitFailed(int64_t txn_id);
+
+  // Returns the transactions, sorted by transaction ID.
+  std::vector<TxnEntry> GetTxnsForTests() const;
+
+ private:
+  // Protects insertions and removals from 'txns_'.
+  mutable simple_spinlock lock_;
+
+  // Maps from transaction ID to the corresponding transaction state.
+  std::unordered_map<int64_t, scoped_refptr<Txn>> txns_;
+};
+
+inline bool operator==(const TxnParticipant::TxnEntry& lhs, const TxnParticipant::TxnEntry& rhs) {
+  return lhs.txn_id == rhs.txn_id &&
+      lhs.state == rhs.state &&
+      lhs.commit_timestamp == rhs.commit_timestamp;
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tserver/tserver_admin.proto b/src/kudu/tserver/tserver_admin.proto
index 7d2432c..e7e2177 100644
--- a/src/kudu/tserver/tserver_admin.proto
+++ b/src/kudu/tserver/tserver_admin.proto
@@ -72,6 +72,32 @@ message CoordinateTransactionResponsePB {
   optional CoordinatorOpResultPB op_result = 2;
 }
 
+message ParticipantOpPB {
+  enum ParticipantOpType {
+    UNKNOWN = 0;
+    BEGIN_TXN = 1;
+    BEGIN_COMMIT = 2;
+    FINALIZE_COMMIT = 3;
+    ABORT_TXN = 4;
+  }
+  optional ParticipantOpType type = 1;
+  optional int64 txn_id = 2;
+
+  // Only set if 'type' is FINALIZE_COMMIT.
+  optional int64 finalized_commit_timestamp = 3;
+}
+
+message ParticipantRequestPB {
+  optional ParticipantOpPB op = 1;
+}
+
+message ParticipantResponsePB {
+  optional TabletServerErrorPB error = 1;
+
+  // The timestamp chosen by the server for this participant op.
+  optional fixed64 timestamp = 2;
+}
+
 message AlterSchemaRequestPB {
   // UUID of server this request is addressed to.
   optional bytes dest_uuid = 5;


Mime
View raw message