kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 02/02: KUDU-2690: don't roll log schema on failed alter
Date Tue, 26 Feb 2019 22:05:18 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch branch-1.9.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a2f0ba7474503a8b6fed16011c6b3fb875e343d2
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Thu Feb 7 14:33:13 2019 -0800

    KUDU-2690: don't roll log schema on failed alter
    
    It is possible to update the log segment header schema version based on
    an AlterSchema operation that failed. This is because an alter operation
    that didn't succeed (e.g. because of a schema version mismatch) is
    treated as a successful transaction (similar to how we treat a
    duplicated insert transaction as a successful transaction), and can thus
    mistakenly lead to updating the log segment schema version, even on
    failure. This can lead to a mismatch of schemas between the log segment
    headers and the write ops in those log segments, which can lead to a
    failure to bootstrap.
    
    This patch addresses this by:
    1. making the tablet no-op if it sees that an alter didn't go through,
    2. storing the error in the commit message, so further bootstraps will
       skip over the op (this is KUDU-860).
    
    Only the former is necessary to prevent the issue, and though the latter
    uses extra space, it may be helpful for added visibility and debugging.
    
    This patch adds a unit test that reproduced the scenario at the
    TabletReplica level, as well as a less targeted test that uses the same
    principal to test that we update the log segment schema upon replaying
    an alter. A more end-to-end test that arrives at the reported state by
    working around master-level table locking can be found here[1].
    
    [1] https://gist.github.com/andrwng/3a049bb038680cc0254c5ba52b9a7507
    
    Change-Id: Id761851741297e29a4666bec0c34fc4f7285f715
    Reviewed-on: http://gerrit.cloudera.org:8080/12462
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Todd Lipcon <todd@apache.org>
    (cherry picked from commit 16d9a863d0f31069965aee1a71912a65339a4160)
    Reviewed-on: http://gerrit.cloudera.org:8080/12595
    Tested-by: Andrew Wong <awong@cloudera.com>
---
 src/kudu/consensus/log.cc                          |   2 +
 src/kudu/tablet/tablet.cc                          |  13 +-
 src/kudu/tablet/tablet_bootstrap.cc                |  37 +++-
 src/kudu/tablet/tablet_replica-test.cc             | 226 ++++++++++++++++++---
 .../transactions/alter_schema_transaction.cc       |  43 ++--
 .../tablet/transactions/alter_schema_transaction.h |  57 +++---
 6 files changed, 296 insertions(+), 82 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index c61fcde..43b11a1 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -933,6 +933,8 @@ int64_t Log::OnDiskSize() {
 
 void Log::SetSchemaForNextLogSegment(const Schema& schema,
                                      uint32_t version) {
+  VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment
$1",
+                                    version, schema.ToString());
   std::lock_guard<rw_spinlock> l(schema_lock_);
   schema_ = schema;
   schema_version_ = version;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 899fc7f..05d4d2f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1246,9 +1246,9 @@ Status Tablet::CreatePreparedAlterSchema(AlterSchemaTransactionState
*tx_state,
   return Status::OK();
 }
 
-Status Tablet::AlterSchema(AlterSchemaTransactionState *tx_state) {
-  DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(tx_state->schema()))) <<
-    "Schema keys cannot be altered(except name)";
+Status Tablet::AlterSchema(AlterSchemaTransactionState* tx_state) {
+  DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(tx_state->schema())))
+      << "Schema keys cannot be altered(except name)";
 
   // Prevent any concurrent flushes. Otherwise, we run into issues where
   // we have an MRS in the rowset tree, and we can't alter its schema
@@ -1258,8 +1258,11 @@ Status Tablet::AlterSchema(AlterSchemaTransactionState *tx_state) {
   // If the current version >= new version, there is nothing to do.
   bool same_schema = schema()->Equals(*tx_state->schema());
   if (metadata_->schema_version() >= tx_state->schema_version()) {
-    LOG_WITH_PREFIX(INFO) << "Already running schema version " << metadata_->schema_version()
-                          << " got alter request for version " << tx_state->schema_version();
+    const string msg =
+        Substitute("Skipping requested alter to schema version $0, tablet already "
+                   "version $1", tx_state->schema_version(), metadata_->schema_version());
+    LOG_WITH_PREFIX(INFO) << msg;
+    tx_state->SetError(Status::InvalidArgument(msg));
     return Status::OK();
   }
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index e62549e..20ebbc4 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -27,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -1446,6 +1447,29 @@ Status TabletBootstrap::PlayWriteRequest(const IOContext* io_context,
 Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
                                                ReplicateMsg* replicate_msg,
                                                const CommitMsg& commit_msg) {
+  // There are three potential outcomes to expect with this replay:
+  // 1. There is no 'result' in the commit message. The alter succeeds, and the
+  //    log updates its schema.
+  // 2. There is no 'result' in the commit message. The alter fails, and the
+  //    log doesn't update its schema. This can happen if trying to replay an
+  //    invalid alter schema request from before we started putting the results
+  //    in the commit message. Note that we'll leave the commit message as is;
+  //    it's harmless since replaying the operation should be a no-op anyway.
+  // 3. The commit message contains a 'result', which should only happen if the
+  //    alter resulted in a failure. Exit out without attempting the alter.
+  if (commit_msg.has_result()) {
+    // If we put a result in the commit message, it should be an error and we
+    // don't need to replay it. In case, in the future, we decide to put
+    // positive results in the commit messages, just filter ops that have
+    // failed statuses instead of D/CHECKing.
+    DCHECK_EQ(1, commit_msg.result().ops_size());
+    const OperationResultPB& op = commit_msg.result().ops(0);
+    if (op.has_failed_status()) {
+      Status error = StatusFromPB(op.failed_status());
+      VLOG(1) << "Played a failed alter request: " << error.ToString();
+      return AppendCommitMsg(commit_msg);
+    }
+  }
   AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
 
   // Decode schema
@@ -1453,19 +1477,16 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
   RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
 
   AlterSchemaTransactionState tx_state(nullptr, alter_schema, nullptr);
-
-  // TODO(KUDU-860): we should somehow distinguish if an alter table failed on its original
-  // attempt (e.g due to being an invalid request, or a request with a too-early
-  // schema version).
-
   RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&tx_state, &schema));
 
   // Apply the alter schema to the tablet
   RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&tx_state), "Failed to AlterSchema:");
 
-  // Also update the log information. Normally, the AlterSchema() call above
-  // takes care of this, but our new log isn't hooked up to the tablet yet.
-  log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
+  if (!tx_state.error()) {
+    // If the alter completed successfully, update the log segment header. Note
+    // that our new log isn't hooked up to the tablet yet.
+    log_->SetSchemaForNextLogSegment(schema, tx_state.schema_version());
+  }
 
   return AppendCommitMsg(commit_msg);
 }
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 8508dd5..4cf9f03 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/tablet/tablet_replica.h"
+
 #include <cstdint>
 #include <memory>
 #include <ostream>
@@ -35,6 +37,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -49,20 +52,21 @@
 #include "kudu/gutil/bind_helpers.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
-#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/tablet_replica_mm_ops.h"
+#include "kudu/tablet/transactions/alter_schema_transaction.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/tablet/transactions/transaction_driver.h"
 #include "kudu/tablet/transactions/transaction_tracker.h"
 #include "kudu/tablet/transactions/write_transaction.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
@@ -78,10 +82,14 @@ METRIC_DECLARE_entity(tablet);
 DECLARE_int32(flush_threshold_mb);
 
 namespace kudu {
+
+class MemTracker;
+
 namespace tablet {
 
 using consensus::CommitMsg;
 using consensus::ConsensusBootstrapInfo;
+using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataManager;
 using consensus::OpId;
 using consensus::RECEIVED_OPID;
@@ -93,9 +101,12 @@ using log::LogOptions;
 using pb_util::SecureDebugString;
 using pb_util::SecureShortDebugString;
 using rpc::Messenger;
+using rpc::ResultTracker;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using tserver::AlterSchemaRequestPB;;
+using tserver::AlterSchemaResponsePB;;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
 
@@ -111,17 +122,8 @@ class TabletReplicaTest : public KuduTabletTest {
       delete_counter_(0) {
   }
 
-  virtual void SetUp() OVERRIDE {
-    KuduTabletTest::SetUp();
-
-    ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
-    ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
-    ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
-
-    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
-    ASSERT_OK(builder.Build(&messenger_));
-
-    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
+  void SetUpReplica(bool new_replica = true) {
+    ASSERT_TRUE(tablet_replica_.get() == nullptr);
 
     RaftConfigPB config;
     config.set_opid_index(consensus::kInvalidOpIdIndex);
@@ -132,15 +134,15 @@ class TabletReplicaTest : public KuduTabletTest {
     config_peer->mutable_last_known_addr()->set_port(0);
     config_peer->set_member_type(RaftPeerPB::VOTER);
 
-    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
-        new ConsensusMetadataManager(tablet()->metadata()->fs_manager()));
 
-    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm));
+    if (new_replica) {
+      ASSERT_OK(cmeta_manager_->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm));
+    }
 
     // "Bootstrap" and start the TabletReplica.
     tablet_replica_.reset(
       new TabletReplica(tablet()->shared_metadata(),
-                        cmeta_manager,
+                        cmeta_manager_,
                         *config_peer,
                         apply_pool_.get(),
                         Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
@@ -154,6 +156,22 @@ class TabletReplicaTest : public KuduTabletTest {
     tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
   }
 
+  virtual void SetUp() override {
+    KuduTabletTest::SetUp();
+
+    ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
+    ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
+    ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
+
+    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
+    ASSERT_OK(builder.Build(&messenger_));
+
+    cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
+
+    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
+    NO_FATALS(SetUpReplica());
+  }
+
   Status StartReplica(const ConsensusBootstrapInfo& info) {
     scoped_refptr<Log> log;
     RETURN_NOT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
@@ -164,7 +182,7 @@ class TabletReplicaTest : public KuduTabletTest {
                                   tablet(),
                                   clock(),
                                   messenger_,
-                                  scoped_refptr<rpc::ResultTracker>(),
+                                  scoped_refptr<ResultTracker>(),
                                   log,
                                   prepare_pool_.get());
   }
@@ -179,22 +197,55 @@ class TabletReplicaTest : public KuduTabletTest {
     LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id <<
". Reason: " << reason;
   }
 
-  virtual void TearDown() OVERRIDE {
+  virtual void TearDown() override {
     tablet_replica_->Shutdown();
     prepare_pool_->Shutdown();
     apply_pool_->Shutdown();
     KuduTabletTest::TearDown();
   }
 
+  void RestartReplica() {
+    tablet_replica_->Shutdown();
+    tablet_replica_.reset();
+    NO_FATALS(SetUpReplica(/*new_replica=*/ false));
+    scoped_refptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(cmeta_manager_->Load(tablet_replica_->tablet_id(), &cmeta));
+    shared_ptr<Tablet> tablet;
+    scoped_refptr<Log> log;
+    ConsensusBootstrapInfo bootstrap_info;
+
+    tablet_replica_->SetBootstrapping();
+    ASSERT_OK(BootstrapTablet(tablet_replica_->tablet_metadata(),
+                              cmeta->CommittedConfig(),
+                              clock(),
+                              shared_ptr<MemTracker>(),
+                              scoped_refptr<ResultTracker>(),
+                              &metric_registry_,
+                              tablet_replica_,
+                              &tablet,
+                              &log,
+                              tablet_replica_->log_anchor_registry(),
+                              &bootstrap_info));
+    ASSERT_OK(tablet_replica_->Start(bootstrap_info,
+                                     tablet,
+                                     clock(),
+                                     messenger_,
+                                     scoped_refptr<ResultTracker>(),
+                                     log,
+                                     prepare_pool_.get()));
+  }
+
  protected:
   // Generate monotonic sequence of key column integers.
-  Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
-    Schema schema(GetTestSchema());
+  Status GenerateSequentialInsertRequest(const Schema& schema,
+                                         WriteRequestPB* write_req) {
     write_req->set_tablet_id(tablet()->tablet_id());
-    CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
+    RETURN_NOT_OK(SchemaToPB(schema, write_req->mutable_schema()));
 
     KuduPartialRow row(&schema);
-    CHECK_OK(row.SetInt32("key", insert_counter_++));
+    for (int i = 0; i < schema.num_columns(); i++) {
+      RETURN_NOT_OK(row.SetInt32(i, insert_counter_++));
+    }
 
     RowOperationsPBEncoder enc(write_req->mutable_row_operations());
     enc.Add(RowOperationsPB::INSERT, row);
@@ -217,9 +268,9 @@ class TabletReplicaTest : public KuduTabletTest {
     return Status::OK();
   }
 
-  Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB&
req) {
-    gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
-    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica,
+  Status ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
+    unique_ptr<WriteResponsePB> resp(new WriteResponsePB());
+    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(replica,
                                                                          &req,
                                                                          nullptr, // No RequestIdPB
                                                                          resp.get()));
@@ -228,18 +279,50 @@ class TabletReplicaTest : public KuduTabletTest {
     tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
         new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
 
-    CHECK_OK(tablet_replica->SubmitWrite(std::move(tx_state)));
+    RETURN_NOT_OK(replica->SubmitWrite(std::move(tx_state)));
+    rpc_latch.Wait();
+    CHECK(!resp->has_error())
+        << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
+    return Status::OK();
+  }
+
+  Status UpdateSchema(const SchemaPB& schema, int schema_version) {
+    AlterSchemaRequestPB alter;
+    alter.set_dest_uuid(tablet()->metadata()->fs_manager()->uuid());
+    alter.set_tablet_id(tablet()->tablet_id());
+    alter.set_schema_version(schema_version);
+    *alter.mutable_schema() = schema;
+    return ExecuteAlter(tablet_replica_.get(), alter);
+  }
+
+  Status ExecuteAlter(TabletReplica* replica, const AlterSchemaRequestPB& req) {
+    unique_ptr<AlterSchemaResponsePB> resp(new AlterSchemaResponsePB());
+    unique_ptr<AlterSchemaTransactionState> tx_state(
+        new AlterSchemaTransactionState(replica, &req, resp.get()));
+    CountDownLatch rpc_latch(1);
+    tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
+          new LatchTransactionCompletionCallback<AlterSchemaResponsePB>(&rpc_latch,
resp.get())));
+    RETURN_NOT_OK(replica->SubmitAlterSchema(std::move(tx_state)));
     rpc_latch.Wait();
     CHECK(!resp->has_error())
         << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
+    return Status::OK();
+  }
+
+  Status RollLog(TabletReplica* replica) {
+    RETURN_NOT_OK(replica->log_->WaitUntilAllFlushed());
+    return replica->log_->AllocateSegmentAndRollOver();
+  }
+
+  Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB&
req) {
+    RETURN_NOT_OK(ExecuteWrite(tablet_replica, req));
 
     // Roll the log after each write.
     // Usually the append thread does the roll and no additional sync is required. However
in
     // this test the thread that is appending is not the same thread that is rolling the
log
     // so we must make sure the Log's queue is flushed before we roll or we might have a
race
     // between the appender thread and the thread executing the test.
-    CHECK_OK(tablet_replica->log_->WaitUntilAllFlushed());
-    CHECK_OK(tablet_replica->log_->AllocateSegmentAndRollOver());
+    CHECK_OK(RollLog(tablet_replica));
     return Status::OK();
   }
 
@@ -247,7 +330,7 @@ class TabletReplicaTest : public KuduTabletTest {
   Status ExecuteInsertsAndRollLogs(int num_inserts) {
     for (int i = 0; i < num_inserts; i++) {
       gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
-      RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get()));
+      RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), req.get()));
       RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
     }
 
@@ -298,6 +381,8 @@ class TabletReplicaTest : public KuduTabletTest {
   gscoped_ptr<ThreadPool> apply_pool_;
   gscoped_ptr<ThreadPool> raft_pool_;
 
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
+
   // Must be destroyed before thread pools.
   scoped_refptr<TabletReplica> tablet_replica_;
 };
@@ -313,7 +398,7 @@ class DelayedApplyTransaction : public WriteTransaction {
         apply_continue_(DCHECK_NOTNULL(apply_continue)) {
   }
 
-  virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) OVERRIDE {
+  virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) override {
     apply_started_->CountDown();
     LOG(INFO) << "Delaying apply...";
     apply_continue_->Wait();
@@ -599,5 +684,84 @@ TEST_F(TabletReplicaTest, TestFlushOpsPerfImprovements) {
   stats.Clear();
 }
 
+// Test that the schema of a tablet will be rolled forward upon replaying an
+// alter schema request.
+TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  SchemaPB orig_schema_pb;
+  ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+  const int orig_schema_version = tablet()->metadata()->schema_version();
+
+  // Add a new column.
+  SchemaBuilder builder(tablet()->metadata()->schema());
+  ASSERT_OK(builder.AddColumn("new_col", INT32));
+  Schema new_client_schema = builder.BuildWithoutIds();
+  SchemaPB new_schema;
+  ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
+  ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
+
+  const auto write = [&] {
+    unique_ptr<WriteRequestPB> req(new WriteRequestPB());
+    ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
+    ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
+  };
+  // Upon restarting, our log segment header schema should have "new_col".
+  NO_FATALS(write());
+  NO_FATALS(RestartReplica());
+
+  // Get rid of the alter in the WALs.
+  NO_FATALS(write());
+  ASSERT_OK(RollLog(tablet_replica_.get()));
+  NO_FATALS(write());
+  tablet_replica_->tablet()->Flush();
+  ASSERT_OK(tablet_replica_->RunLogGC());
+
+  // Now write some more and restart. If our segment header schema previously
+  // didn't have "new_col", bootstrapping would fail, complaining about a
+  // mismatch between the segment header schema and the write request schema.
+  NO_FATALS(write());
+  NO_FATALS(RestartReplica());
+}
+
+// Regression test for KUDU-2690, wherein a alter schema request that failed
+// (e.g. because of an invalid schema) would roll forward the log segment
+// header schema, causing a failure or crash upon bootstrapping.
+TEST_F(TabletReplicaTest, Kudu2690Test) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  SchemaPB orig_schema_pb;
+  ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+  const int orig_schema_version = tablet()->metadata()->schema_version();
+
+  // First things first, add a new column.
+  SchemaBuilder builder(tablet()->metadata()->schema());
+  ASSERT_OK(builder.AddColumn("new_col", INT32));
+  Schema new_client_schema = builder.BuildWithoutIds();
+  SchemaPB new_schema;
+  ASSERT_OK(SchemaToPB(builder.Build(), &new_schema));
+  ASSERT_OK(UpdateSchema(new_schema, orig_schema_version + 1));
+
+  // Try to update the schema to an older version. Before the fix for
+  // KUDU-2690, this would revert the schema in the next log segment header
+  // upon rolling the log below.
+  ASSERT_OK(UpdateSchema(orig_schema_pb, orig_schema_version));
+
+  // Roll onto a new segment so we can begin filling a new segment. This allows
+  // us to GC the first segment.
+  ASSERT_OK(RollLog(tablet_replica_.get()));
+  {
+    unique_ptr<WriteRequestPB> req(new WriteRequestPB());
+    ASSERT_OK(GenerateSequentialInsertRequest(new_client_schema, req.get()));
+    ASSERT_OK(ExecuteWrite(tablet_replica_.get(), *req));
+  }
+  ASSERT_OK(tablet_replica_->RunLogGC());
+
+  // Before KUDU-2960 was fixed, bootstrapping would fail, complaining that the
+  // write requests contained a column that was not in the log segment header's
+  // schema.
+  NO_FATALS(RestartReplica());
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.cc b/src/kudu/tablet/transactions/alter_schema_transaction.cc
index c9f3a70..4da364e 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.cc
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/transactions/alter_schema_transaction.h"
 
 #include <memory>
+#include <ostream>
 #include <utility>
 
 #include <glog/logging.h>
@@ -26,8 +27,10 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -47,16 +50,6 @@ using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 using tserver::TabletServerErrorPB;
-using tserver::AlterSchemaRequestPB;
-using tserver::AlterSchemaResponsePB;
-
-string AlterSchemaTransactionState::ToString() const {
-  return Substitute("AlterSchemaTransactionState "
-                    "[timestamp=$0, schema=$1, request=$2]",
-                    has_timestamp() ? timestamp().ToString() : "<unassigned>",
-                    schema_ == nullptr ? "(none)" : schema_->ToString(),
-                    request_ == nullptr ? "(none)" : SecureShortDebugString(*request_));
-}
 
 void AlterSchemaTransactionState::AcquireSchemaLock(rw_semaphore* l) {
   TRACE("Acquiring schema lock in exclusive mode");
@@ -70,6 +63,21 @@ void AlterSchemaTransactionState::ReleaseSchemaLock() {
   TRACE("Released schema lock");
 }
 
+void AlterSchemaTransactionState::SetError(const Status& s) {
+  CHECK(!s.ok()) << "Expected an error status";
+  error_ = OperationResultPB();
+  StatusToPB(s, error_->mutable_failed_status());
+}
+
+string AlterSchemaTransactionState::ToString() const {
+  return Substitute("AlterSchemaTransactionState "
+                    "[timestamp=$0, schema=$1, request=$2, error=$3]",
+                    has_timestamp() ? timestamp().ToString() : "<unassigned>",
+                    schema_ == nullptr ? "(none)" : schema_->ToString(),
+                    request_ == nullptr ? "(none)" : SecureShortDebugString(*request_),
+                    error_ ? "(none)" : SecureShortDebugString(error_->failed_status()));
+}
+
 AlterSchemaTransaction::AlterSchemaTransaction(unique_ptr<AlterSchemaTransactionState>
state,
                                                DriverType type)
     : Transaction(state.get(), type, Transaction::ALTER_SCHEMA_TXN),
@@ -115,6 +123,18 @@ Status AlterSchemaTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg)
{
 
   Tablet* tablet = state_->tablet_replica()->tablet();
   RETURN_NOT_OK(tablet->AlterSchema(state()));
+
+  commit_msg->reset(new CommitMsg());
+  (*commit_msg)->set_op_type(ALTER_SCHEMA_OP);
+
+  // If there was a logical error (e.g. bad schema version) with the alter,
+  // record the error and exit.
+  if (state_->error()) {
+    TxResultPB* result = (*commit_msg)->mutable_result();
+    *result->add_ops() = std::move(*state_->error());
+    return Status::OK();
+  }
+
   state_->tablet_replica()->log()
     ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()),
                                                  state_->schema_version());
@@ -122,9 +142,6 @@ Status AlterSchemaTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg)
{
   // Altered tablets should be included in the next tserver heartbeat so that
   // clients waiting on IsAlterTableDone() are unblocked promptly.
   state_->tablet_replica()->MarkTabletDirty("Alter schema finished");
-
-  commit_msg->reset(new CommitMsg());
-  (*commit_msg)->set_op_type(ALTER_SCHEMA_OP);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.h b/src/kudu/tablet/transactions/alter_schema_transaction.h
index ae17933..b33d1a8 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.h
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.h
@@ -15,19 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_TABLET_ALTER_SCHEMA_TRANSACTION_H_
-#define KUDU_TABLET_ALTER_SCHEMA_TRANSACTION_H_
+#pragma once
 
-#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <mutex>
 #include <string>
 
+#include <boost/optional/optional.hpp>
+
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
+#include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/util/status.h"
@@ -52,13 +52,13 @@ class AlterSchemaTransactionState : public TransactionState {
                               const tserver::AlterSchemaRequestPB* request,
                               tserver::AlterSchemaResponsePB* response)
       : TransactionState(tablet_replica),
-        schema_(NULL),
+        schema_(nullptr),
         request_(request),
         response_(response) {
   }
 
-  const tserver::AlterSchemaRequestPB* request() const OVERRIDE { return request_; }
-  tserver::AlterSchemaResponsePB* response() const OVERRIDE { return response_; }
+  const tserver::AlterSchemaRequestPB* request() const override { return request_; }
+  tserver::AlterSchemaResponsePB* response() const override { return response_; }
 
   void set_schema(const Schema* schema) { schema_ = schema; }
   const Schema* schema() const { return schema_; }
@@ -81,15 +81,22 @@ class AlterSchemaTransactionState : public TransactionState {
   // Crashes if the lock was not already acquired.
   void ReleaseSchemaLock();
 
-  // Note: request_ and response_ are set to NULL after this method returns.
+  // Note: request_ and response_ are set to null after this method returns.
   void Finish() {
-    // Make the request NULL since after this transaction commits
+    // Make the request null since after this transaction commits
     // the request may be deleted at any moment.
-    request_ = NULL;
-    response_ = NULL;
+    request_ = nullptr;
+    response_ = nullptr;
+  }
+
+  // Sets the fact that the alter had an error.
+  void SetError(const Status& s);
+
+  boost::optional<OperationResultPB> error() const {
+    return error_;
   }
 
-  virtual std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(AlterSchemaTransactionState);
@@ -103,35 +110,36 @@ class AlterSchemaTransactionState : public TransactionState {
 
   // The lock held on the tablet's schema_lock_.
   std::unique_lock<rw_semaphore> schema_lock_;
+
+  // The error result of this alter schema transaction. May be empty if the
+  // transaction hasn't been applied or if the alter succeeded.
+  boost::optional<OperationResultPB> error_;
 };
 
 // Executes the alter schema transaction,.
 class AlterSchemaTransaction : public Transaction {
  public:
-  AlterSchemaTransaction(std::unique_ptr<AlterSchemaTransactionState> tx_state,
+  AlterSchemaTransaction(std::unique_ptr<AlterSchemaTransactionState> state,
                          consensus::DriverType type);
 
-  virtual AlterSchemaTransactionState* state() OVERRIDE { return state_.get(); }
-  virtual const AlterSchemaTransactionState* state() const OVERRIDE { return state_.get();
}
+  AlterSchemaTransactionState* state() override { return state_.get(); }
+  const AlterSchemaTransactionState* state() const override { return state_.get(); }
 
-  void NewReplicateMsg(gscoped_ptr<consensus::ReplicateMsg>* replicate_msg) OVERRIDE;
+  void NewReplicateMsg(gscoped_ptr<consensus::ReplicateMsg>* replicate_msg) override;
 
   // Executes a Prepare for the alter schema transaction.
-  //
-  // TODO: need a schema lock?
-
-  virtual Status Prepare() OVERRIDE;
+  Status Prepare() override;
 
   // Starts the AlterSchemaTransaction by assigning it a timestamp.
-  virtual Status Start() OVERRIDE;
+  Status Start() override;
 
   // Executes an Apply for the alter schema transaction
-  virtual Status Apply(gscoped_ptr<consensus::CommitMsg>* commit_msg) OVERRIDE;
+  Status Apply(gscoped_ptr<consensus::CommitMsg>* commit_msg) override;
 
   // Actually commits the transaction.
-  virtual void Finish(TransactionResult result) OVERRIDE;
+  void Finish(TransactionResult result) override;
 
-  virtual std::string ToString() const OVERRIDE;
+  std::string ToString() const override;
 
  private:
   std::unique_ptr<AlterSchemaTransactionState> state_;
@@ -141,4 +149,3 @@ class AlterSchemaTransaction : public Transaction {
 }  // namespace tablet
 }  // namespace kudu
 
-#endif /* KUDU_TABLET_ALTER_SCHEMA_TRANSACTION_H_ */


Mime
View raw message