kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: KUDU-2625: Support per-row error check in prepare stage
Date Fri, 26 Jul 2019 20:47:24 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4669afc  KUDU-2625: Support per-row error check in prepare stage
4669afc is described below

commit 4669afc237738003431c0cf849ee2da26870b441
Author: Yingchun Lai <405403881@qq.com>
AuthorDate: Sun Jul 21 19:27:45 2019 +0800

    KUDU-2625: Support per-row error check in prepare stage
    
    Tablet servers reject the whole batch if there is a row that violates
    table schema constraints (e.g., presence of null values for non-nullable
    columns). This behavior is different from the case when errors happen at
    later stage of 'applying' received write operations (e.g., a duplicate
    key error).
    This patch reject only the 'bad' rows instead of the whole batch when
    checked errors in prepare stage.
    
    Change-Id: I497fc3d5d1c9cbb0c183997c9adb8f5efeb9c9d0
    Reviewed-on: http://gerrit.cloudera.org:8080/13864
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/client-test.cc                    | 185 +++++++++++++++++++++-
 src/kudu/common/partial_row.h                     |   3 +
 src/kudu/common/row_operations-test.cc            | 128 +++++++++++++--
 src/kudu/common/row_operations.cc                 | 147 ++++++++++-------
 src/kudu/common/row_operations.h                  |  14 +-
 src/kudu/common/schema.h                          |   2 +-
 src/kudu/tablet/row_op.cc                         |  17 +-
 src/kudu/tablet/row_op.h                          |  19 +--
 src/kudu/tablet/tablet.cc                         |   9 +-
 src/kudu/tablet/transactions/write_transaction.cc |  33 ++--
 src/kudu/tablet/transactions/write_transaction.h  |   2 +
 11 files changed, 439 insertions(+), 120 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 10c3223..290380e 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -2412,11 +2412,16 @@ static Status ApplyInsertToSession(KuduSession* session,
                                    const shared_ptr<KuduTable>& table,
                                    int row_key,
                                    int int_val,
-                                   const char* string_val) {
+                                   const char* string_val,
+                                   boost::optional<int> non_null_with_default = boost::none) {
   unique_ptr<KuduInsert> insert(table->NewInsert());
   RETURN_NOT_OK(insert->mutable_row()->SetInt32("key", row_key));
   RETURN_NOT_OK(insert->mutable_row()->SetInt32("int_val", int_val));
   RETURN_NOT_OK(insert->mutable_row()->SetStringCopy("string_val", string_val));
+  if (non_null_with_default) {
+    RETURN_NOT_OK(insert->mutable_row()->SetInt32("non_null_with_default",
+                                                  non_null_with_default.get()));
+  }
   return session->Apply(insert.release());
 }
 
@@ -2444,9 +2449,13 @@ static Status ApplyUpdateToSession(KuduSession* session,
 
 static Status ApplyDeleteToSession(KuduSession* session,
                                    const shared_ptr<KuduTable>& table,
-                                   int row_key) {
+                                   int row_key,
+                                   boost::optional<int> int_val = boost::none) {
   unique_ptr<KuduDelete> del(table->NewDelete());
   RETURN_NOT_OK(del->mutable_row()->SetInt32("key", row_key));
+  if (int_val) {
+    RETURN_NOT_OK(del->mutable_row()->SetInt32("int_val", int_val.get()));
+  }
   return session->Apply(del.release());
 }
 
@@ -2620,9 +2629,9 @@ TEST_F(ClientTest, TestMultipleMultiRowManualBatches) {
             , rows[0]);
 }
 
-// Test a batch where one of the inserted rows succeeds while another
-// fails.
-TEST_F(ClientTest, TestBatchWithPartialError) {
+// Test a batch where one of the inserted rows succeeds while another fails.
+// 1. Insert duplicate keys.
+TEST_F(ClientTest, TestBatchWithPartialErrorOfDuplicateKeys) {
   shared_ptr<KuduSession> session = client_->NewSession();
   ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
 
@@ -2639,14 +2648,17 @@ TEST_F(ClientTest, TestBatchWithPartialError) {
   ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
 
   // Fetch and verify the reported error.
-  unique_ptr<KuduError> error = GetSingleErrorFromSession(session.get());
+  unique_ptr<KuduError> error;
+  NO_FATALS(error = GetSingleErrorFromSession(session.get()));
   ASSERT_TRUE(error->status().IsAlreadyPresent());
+  ASSERT_EQ(error->status().ToString(),
+            "Already present: key already present");
   ASSERT_EQ(error->failed_op().ToString(),
             R"(INSERT int32 key=1, int32 int_val=1, string string_val="Attempted dup")");
 
   // Verify that the other row was successfully inserted
   vector<string> rows;
-  ScanTableToStrings(client_table_.get(), &rows);
+  NO_FATALS(ScanTableToStrings(client_table_.get(), &rows));
   ASSERT_EQ(2, rows.size());
   std::sort(rows.begin(), rows.end());
   ASSERT_EQ(R"((int32 key=1, int32 int_val=1, string string_val="original row", )"
@@ -2655,6 +2667,165 @@ TEST_F(ClientTest, TestBatchWithPartialError) {
             "int32 non_null_with_default=12345)", rows[1]);
 }
 
+// 2. Insert a row missing a required column.
+TEST_F(ClientTest, TestBatchWithPartialErrorOfMissingRequiredColumn) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // Remove default value of a non-nullable column.
+  unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+  table_alterer->AlterColumn("non_null_with_default")->RemoveDefault();
+  ASSERT_OK(table_alterer->Alter());
+
+  // Insert a row successfully.
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 1, 1, "Should succeed", 1));
+  // Insert a row missing a required column, which will fail.
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 2, "Missing required column"));
+  Status s = session->Flush();
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+
+  // Fetch and verify the reported error.
+  unique_ptr<KuduError> error;
+  NO_FATALS(error = GetSingleErrorFromSession(session.get()));
+  ASSERT_TRUE(error->status().IsInvalidArgument());
+  ASSERT_EQ(error->status().ToString(),
+            "Invalid argument: No value provided for required column: "
+            "non_null_with_default INT32 NOT NULL");
+  ASSERT_EQ(error->failed_op().ToString(),
+            R"(INSERT int32 key=2, int32 int_val=2, )"
+            R"(string string_val="Missing required column")");
+
+  // Verify that the other row was successfully inserted
+  vector<string> rows;
+  NO_FATALS(ScanTableToStrings(client_table_.get(), &rows));
+  ASSERT_EQ(1, rows.size());
+  std::sort(rows.begin(), rows.end());
+  ASSERT_EQ(R"((int32 key=1, int32 int_val=1, string string_val="Should succeed", )"
+            "int32 non_null_with_default=1)", rows[0]);
+}
+
+// 3. No fields updated for a row.
+TEST_F(ClientTest, TestBatchWithPartialErrorOfNoFieldsUpdated) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // Insert two rows successfully.
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 1, 1, "One"));
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 2, "Two"));
+  ASSERT_OK(session->Flush());
+
+  // Update a row without any non-key fields updated, which will fail.
+  unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
+  ASSERT_OK(update->mutable_row()->SetInt32("key", 1));
+  ASSERT_OK(session->Apply(update.release()));
+  // Update a row with some non-key fields updated, which will success.
+  ASSERT_OK(ApplyUpdateToSession(session.get(), client_table_, 2, 22));
+  Status s = session->Flush();
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+
+  // Fetch and verify the reported error.
+  unique_ptr<KuduError> error;
+  NO_FATALS(error = GetSingleErrorFromSession(session.get()));
+  ASSERT_TRUE(error->status().IsInvalidArgument());
+  ASSERT_EQ(error->status().ToString(),
+            "Invalid argument: No fields updated, key is: (int32 key=1)");
+  ASSERT_EQ(error->failed_op().ToString(), R"(UPDATE int32 key=1)");
+
+  // Verify that the other row was successfully updated.
+  vector<string> rows;
+  NO_FATALS(ScanTableToStrings(client_table_.get(), &rows));
+  ASSERT_EQ(2, rows.size());
+  std::sort(rows.begin(), rows.end());
+  ASSERT_EQ(R"((int32 key=1, int32 int_val=1, string string_val="One", )"
+            "int32 non_null_with_default=12345)", rows[0]);
+  ASSERT_EQ(R"((int32 key=2, int32 int_val=22, string string_val="Two", )"
+            "int32 non_null_with_default=12345)", rows[1]);
+}
+
+// 4. Delete a row with a non-key column specified.
+TEST_F(ClientTest, TestBatchWithPartialErrorOfNonKeyColumnSpecifiedDelete) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // Insert two rows successfully.
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 1, 1, "One"));
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 2, "Two"));
+  ASSERT_OK(session->Flush());
+
+  // Delete a row without any non-key fields, which will success.
+  ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1));
+  // Delete a row with some non-key fields, which will fail.
+  ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 2, 2));
+  Status s = session->Flush();
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+
+  // Fetch and verify the reported error.
+  unique_ptr<KuduError> error;
+  NO_FATALS(error = GetSingleErrorFromSession(session.get()));
+  ASSERT_TRUE(error->status().IsInvalidArgument());
+  ASSERT_EQ(error->status().ToString(),
+            "Invalid argument: DELETE should not have a value for column: "
+            "int_val INT32 NOT NULL");
+  ASSERT_EQ(error->failed_op().ToString(), R"(DELETE int32 key=2, int32 int_val=2)");
+
+  // Verify that the other row was successfully deleted.
+  vector<string> rows;
+  NO_FATALS(ScanTableToStrings(client_table_.get(), &rows));
+  ASSERT_EQ(1, rows.size());
+  std::sort(rows.begin(), rows.end());
+  ASSERT_EQ(R"((int32 key=2, int32 int_val=2, string string_val="Two", )"
+            "int32 non_null_with_default=12345)", rows[0]);
+}
+
+// 5. All row failed in prepare phase.
+TEST_F(ClientTest, TestBatchWithPartialErrorOfAllRowsFailed) {
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // Insert two rows successfully.
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 1, 1, "One"));
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 2, 2, "Two"));
+  ASSERT_OK(session->Flush());
+
+  // Delete rows with some non-key fields, which will fail.
+  ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 1, 1));
+  ASSERT_OK(ApplyDeleteToSession(session.get(), client_table_, 2, 2));
+  Status s = session->Flush();
+  ASSERT_FALSE(s.ok());
+  ASSERT_STR_CONTAINS(s.ToString(), "Some errors occurred");
+
+  // Fetch and verify the reported error.
+  vector<KuduError*> errors;
+  ElementDeleter d(&errors);
+  bool overflow;
+  session->GetPendingErrors(&errors, &overflow);
+  ASSERT_TRUE(!overflow);
+  ASSERT_EQ(2, errors.size());
+  ASSERT_TRUE(errors[0]->status().IsInvalidArgument());
+  ASSERT_EQ(errors[0]->status().ToString(),
+            "Invalid argument: DELETE should not have a value for column: "
+            "int_val INT32 NOT NULL");
+  ASSERT_EQ(errors[0]->failed_op().ToString(), R"(DELETE int32 key=1, int32 int_val=1)");
+  ASSERT_TRUE(errors[1]->status().IsInvalidArgument());
+  ASSERT_EQ(errors[1]->status().ToString(),
+            "Invalid argument: DELETE should not have a value for column: "
+            "int_val INT32 NOT NULL");
+  ASSERT_EQ(errors[1]->failed_op().ToString(), R"(DELETE int32 key=2, int32 int_val=2)");
+
+  // Verify that no row was deleted.
+  vector<string> rows;
+  NO_FATALS(ScanTableToStrings(client_table_.get(), &rows));
+  ASSERT_EQ(2, rows.size());
+  std::sort(rows.begin(), rows.end());
+  ASSERT_EQ(R"((int32 key=1, int32 int_val=1, string string_val="One", )"
+            "int32 non_null_with_default=12345)", rows[0]);
+  ASSERT_EQ(R"((int32 key=2, int32 int_val=2, string string_val="Two", )"
+            "int32 non_null_with_default=12345)", rows[1]);
+}
+
 void ClientTest::DoTestWriteWithDeadServer(WhichServerToKill which) {
   shared_ptr<KuduSession> session = client_->NewSession();
   session->SetTimeoutMillis(1000);
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index d43d109..74375c3 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -514,6 +514,9 @@ class KUDU_EXPORT KuduPartialRow {
   FRIEND_TEST(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning);
   FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
   FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
+  FRIEND_TEST(RowOperationsTest, ProjectionTestWholeSchemaSpecified);
+  FRIEND_TEST(RowOperationsTest, TestProjectUpdates);
+  FRIEND_TEST(RowOperationsTest, TestProjectDeletes);
 
   template<typename T>
   Status Set(const Slice& col_name, const typename T::cpp_type& val,
diff --git a/src/kudu/common/row_operations-test.cc b/src/kudu/common/row_operations-test.cc
index 76c869f..ccb08e6 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -24,18 +24,21 @@
 #include <string>
 #include <vector>
 
-#include <gtest/gtest.h>
 #include <glog/logging.h>
+#include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/row.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/bitmap.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -45,8 +48,6 @@
 using std::shared_ptr;
 using std::string;
 using std::vector;
-using strings::Substitute;
-using strings::SubstituteAndAppend;
 
 namespace kudu {
 
@@ -372,6 +373,15 @@ string TestProjection(RowOperationsPB::Type type,
   return ops[0].ToString(server_schema);
 }
 
+Status FindColumn(const Schema& schema, const Slice& col_name, int* idx) {
+  StringPiece sp(reinterpret_cast<const char*>(col_name.data()), col_name.size());
+  *idx = schema.find_column(sp);
+  if (PREDICT_FALSE(*idx == -1)) {
+    return Status::NotFound("No such column", col_name);
+  }
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 // Test decoding partial rows from a client who has a schema which matches
@@ -381,12 +391,32 @@ TEST_F(RowOperationsTest, ProjectionTestWholeSchemaSpecified) {
                          ColumnSchema("int_val", INT32),
                          ColumnSchema("string_val", STRING, true) },
                        1);
+  // Test a row missing 'key', which is key column.
+  {
+    KuduPartialRow client_row(&client_schema);
+    EXPECT_EQ("row error: Invalid argument: No value provided for required column: "
+              "key INT32 NOT NULL",
+              TestProjection(RowOperationsPB::INSERT, client_row, schema_));
+  }
+
+  // Force to set null on key column.
+  {
+    int col_idx;
+    ASSERT_OK(FindColumn(client_schema, "key", &col_idx));
+    KuduPartialRow client_row(&client_schema);
+    ContiguousRow row(&client_schema, client_row.row_data_);
+    row.set_null(col_idx, true);
+    BitmapSet(client_row.isset_bitmap_, col_idx);
+    EXPECT_EQ("row error: Invalid argument: NULL values not allowed for non-nullable column: "
+              "key INT32 NOT NULL",
+              TestProjection(RowOperationsPB::INSERT, client_row, schema_));
+  }
 
   // Test a row missing 'int_val', which is required.
   {
     KuduPartialRow client_row(&client_schema);
     CHECK_OK(client_row.SetInt32("key", 12345));
-    EXPECT_EQ("error: Invalid argument: No value provided for required column: "
+    EXPECT_EQ("row error: Invalid argument: No value provided for required column: "
               "int_val INT32 NOT NULL",
               TestProjection(RowOperationsPB::INSERT, client_row, schema_));
   }
@@ -469,6 +499,19 @@ TEST_F(RowOperationsTest, ProjectionTestWithDefaults) {
               " int32 non_null_with_default=54321)",
               TestProjection(RowOperationsPB::INSERT, client_row, server_schema));
   }
+
+  // Specify the key and override both defaults, overriding the non-nullable
+  // one to NULL.
+  {
+    KuduPartialRow client_row(&client_schema);
+    CHECK_OK(client_row.SetInt32("key", 12345));
+    CHECK_OK(client_row.SetInt32("nullable_with_default", 12345));
+    Status s = client_row.SetNull("non_null_with_default");
+    CHECK(s.IsInvalidArgument());
+    EXPECT_EQ("INSERT (int32 key=12345, int32 nullable_with_default=12345,"
+              " int32 non_null_with_default=456)",
+              TestProjection(RowOperationsPB::INSERT, client_row, server_schema));
+  }
 }
 
 // Test cases where the client only has a subset of the fields
@@ -492,7 +535,7 @@ TEST_F(RowOperationsTest, ProjectionTestWithClientHavingValidSubset) {
   {
     KuduPartialRow client_row(&client_schema);
     CHECK_OK(client_row.SetInt32("key", 12345));
-    EXPECT_EQ("error: Invalid argument: No value provided for required column:"
+    EXPECT_EQ("row error: Invalid argument: No value provided for required column:"
               " int_val INT32 NOT NULL",
               TestProjection(RowOperationsPB::INSERT, client_row, server_schema));
   }
@@ -539,15 +582,14 @@ TEST_F(RowOperationsTest, TestProjectUpdates) {
 
   // Check without specifying any columns
   KuduPartialRow client_row(&client_schema);
-  EXPECT_EQ("error: Invalid argument: No value provided for key column: key INT32 NOT NULL",
+  EXPECT_EQ("row error: Invalid argument: No value provided for key column: key INT32 NOT NULL",
             TestProjection(RowOperationsPB::UPDATE, client_row, server_schema));
 
   // Specify the key and no columns to update
   ASSERT_OK(client_row.SetInt32("key", 12345));
-  EXPECT_EQ("error: Invalid argument: No fields updated, key is: (int32 key=12345)",
+  EXPECT_EQ("row error: Invalid argument: No fields updated, key is: (int32 key=12345)",
             TestProjection(RowOperationsPB::UPDATE, client_row, server_schema));
 
-
   // Specify the key and update one column.
   ASSERT_OK(client_row.SetInt32("int_val", 12345));
   EXPECT_EQ("MUTATE (int32 key=12345) SET int_val=12345",
@@ -562,6 +604,33 @@ TEST_F(RowOperationsTest, TestProjectUpdates) {
   ASSERT_OK(client_row.SetNull("string_val"));
   EXPECT_EQ("MUTATE (int32 key=12345) SET int_val=12345, string_val=NULL",
             TestProjection(RowOperationsPB::UPDATE, client_row, server_schema));
+
+  // Force to set null on key column.
+  {
+    KuduPartialRow client_row(&client_schema);
+    int col_idx;
+    ASSERT_OK(FindColumn(client_schema, "key", &col_idx));
+    ContiguousRow row(&client_schema, client_row.row_data_);
+    row.set_null(col_idx, true);
+    BitmapSet(client_row.isset_bitmap_, col_idx);
+    EXPECT_EQ("row error: Invalid argument: NULL values not allowed for key column: "
+              "key INT32 NOT NULL",
+              TestProjection(RowOperationsPB::UPDATE, client_row, server_schema));
+  }
+
+  // Force to set null on non-nullable column.
+  {
+    KuduPartialRow client_row(&client_schema);
+    ASSERT_OK(client_row.SetInt32("key", 12345));
+    int col_idx;
+    ASSERT_OK(FindColumn(client_schema, "int_val", &col_idx));
+    ContiguousRow row(&client_schema, client_row.row_data_);
+    row.set_null(col_idx, true);
+    BitmapSet(client_row.isset_bitmap_, col_idx);
+    EXPECT_EQ("row error: Invalid argument: NULL value not allowed for non-nullable column: "
+              "int_val INT32 NOT NULL",
+              TestProjection(RowOperationsPB::UPDATE, client_row, server_schema));
+  }
 }
 
 // Client schema has the columns in a different order. Makes
@@ -623,18 +692,19 @@ TEST_F(RowOperationsTest, TestClientMismatchedType) {
 TEST_F(RowOperationsTest, TestProjectDeletes) {
   Schema client_schema({ ColumnSchema("key", INT32),
                          ColumnSchema("key_2", INT32),
+                         ColumnSchema("int_val", INT32),
                          ColumnSchema("string_val", STRING, true) },
                        2);
   Schema server_schema = SchemaBuilder(client_schema).Build();
 
   KuduPartialRow client_row(&client_schema);
   // No columns set
-  EXPECT_EQ("error: Invalid argument: No value provided for key column: key INT32 NOT NULL",
+  EXPECT_EQ("row error: Invalid argument: No value provided for key column: key INT32 NOT NULL",
             TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
 
   // Only half the key set
   ASSERT_OK(client_row.SetInt32("key", 12345));
-  EXPECT_EQ("error: Invalid argument: No value provided for key column: key_2 INT32 NOT NULL",
+  EXPECT_EQ("row error: Invalid argument: No value provided for key column: key_2 INT32 NOT NULL",
             TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
 
   // Whole key set (correct)
@@ -642,11 +712,45 @@ TEST_F(RowOperationsTest, TestProjectDeletes) {
   EXPECT_EQ("MUTATE (int32 key=12345, int32 key_2=54321) DELETE",
             TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
 
+  // Extra column set
+  ASSERT_OK(client_row.SetNull("string_val"));
+  EXPECT_EQ("row error: Invalid argument: DELETE should not have a value for column: "
+            "string_val STRING NULLABLE",
+            TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
+
   // Extra column set (incorrect)
   ASSERT_OK(client_row.SetStringNoCopy("string_val", "hello"));
-  EXPECT_EQ("error: Invalid argument: DELETE should not have a value for column: "
+  EXPECT_EQ("row error: Invalid argument: DELETE should not have a value for column: "
             "string_val STRING NULLABLE",
             TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
+
+  // Force to set null on key column.
+  {
+    KuduPartialRow client_row(&client_schema);
+    int col_idx;
+    ASSERT_OK(FindColumn(client_schema, "key", &col_idx));
+    ContiguousRow row(&client_schema, client_row.row_data_);
+    row.set_null(col_idx, true);
+    BitmapSet(client_row.isset_bitmap_, col_idx);
+    EXPECT_EQ("row error: Invalid argument: NULL values not allowed for key column: "
+              "key INT32 NOT NULL",
+              TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
+  }
+
+  // Force to set null on non-key column.
+  {
+    KuduPartialRow client_row(&client_schema);
+    ASSERT_OK(client_row.SetInt32("key", 12345));
+    ASSERT_OK(client_row.SetInt32("key_2", 12345));
+    int col_idx;
+    ASSERT_OK(FindColumn(client_schema, "int_val", &col_idx));
+    ContiguousRow row(&client_schema, client_row.row_data_);
+    row.set_null(col_idx, true);
+    BitmapSet(client_row.isset_bitmap_, col_idx);
+    EXPECT_EQ("row error: Invalid argument: DELETE should not have a value for column: "
+              "int_val INT32 NOT NULL",
+              TestProjection(RowOperationsPB::DELETE, client_row, server_schema));
+  }
 }
 
 TEST_F(RowOperationsTest, SplitKeyRoundTrip) {
diff --git a/src/kudu/common/row_operations.cc b/src/kudu/common/row_operations.cc
index 952b56c..166f559 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -20,6 +20,7 @@
 #include <cstring>
 #include <ostream>
 #include <string>
+#include <utility>
 
 #include <glog/logging.h>
 
@@ -45,6 +46,9 @@ using strings::Substitute;
 namespace kudu {
 
 string DecodedRowOperation::ToString(const Schema& schema) const {
+  if (!result.ok()) {
+    return Substitute("row error: $0", result.ToString());
+  }
   // A note on redaction: We redact row operations, since they contain sensitive
   // row data. Range partition operations are not redacted, since range
   // partitions are considered to be metadata.
@@ -76,6 +80,13 @@ string DecodedRowOperation::ToString(const Schema& schema) const {
   return "UNKNOWN";
 }
 
+void DecodedRowOperation::SetFailureStatusOnce(Status s) {
+  DCHECK(!s.ok());
+  if (result.ok()) {
+    result = std::move(s);
+  }
+}
+
 RowOperationsPBEncoder::RowOperationsPBEncoder(RowOperationsPB* pb)
   : pb_(pb) {
 }
@@ -192,7 +203,7 @@ Status RowOperationsPBDecoder::ReadNullBitmap(const uint8_t** null_bm) {
 }
 
 Status RowOperationsPBDecoder::GetColumnSlice(const ColumnSchema& col, Slice* slice) {
-  int size = col.type_info()->size();
+  size_t size = col.type_info()->size();
   if (PREDICT_FALSE(src_.size() < size)) {
     return Status::Corruption("Not enough data for column", col.ToString());
   }
@@ -200,8 +211,8 @@ Status RowOperationsPBDecoder::GetColumnSlice(const ColumnSchema& col, Slice* sl
   if (col.type_info()->physical_type() == BINARY) {
     // The Slice in the protobuf has a pointer relative to the indirect data,
     // not a real pointer. Need to fix that.
-    const Slice* ptr_slice = reinterpret_cast<const Slice*>(src_.data());
-    size_t offset_in_indirect = reinterpret_cast<uintptr_t>(ptr_slice->data());
+    auto ptr_slice = reinterpret_cast<const Slice*>(src_.data());
+    auto offset_in_indirect = reinterpret_cast<uintptr_t>(ptr_slice->data());
     bool overflowed = false;
     size_t max_offset = AddWithOverflowCheck(offset_in_indirect, ptr_slice->size(), &overflowed);
     if (PREDICT_FALSE(overflowed || max_offset > pb_->indirect_data().size())) {
@@ -227,6 +238,12 @@ Status RowOperationsPBDecoder::ReadColumn(const ColumnSchema& col, uint8_t* dst)
   return Status::OK();
 }
 
+Status RowOperationsPBDecoder::ReadColumnAndDiscard(const ColumnSchema& col) {
+  uint8_t scratch[kLargestTypeSize];
+  RETURN_NOT_OK(ReadColumn(col, scratch));
+  return Status::OK();
+}
+
 bool RowOperationsPBDecoder::HasNext() const {
   return !src_.empty();
 }
@@ -271,7 +288,7 @@ class ClientServerMapping {
     DCHECK_EQ(client_to_tablet_.size(), client_col_idx);
     DCHECK_LT(tablet_col_idx, saw_tablet_col_.size());
     client_to_tablet_.push_back(tablet_col_idx);
-    saw_tablet_col_[tablet_col_idx] = 1;
+    saw_tablet_col_[tablet_col_idx] = true;
     return Status::OK();
   }
 
@@ -288,12 +305,12 @@ class ClientServerMapping {
   }
 
   // Translate from a client schema index to the tablet schema index
-  int client_to_tablet_idx(int client_idx) const {
+  size_t client_to_tablet_idx(size_t client_idx) const {
     DCHECK_LT(client_idx, client_to_tablet_.size());
     return client_to_tablet_[client_idx];
   }
 
-  int num_mapped() const {
+  size_t num_mapped() const {
     return client_to_tablet_.size();
   }
 
@@ -301,7 +318,7 @@ class ClientServerMapping {
   // server side schema are found in the client-side schema. If not,
   // returns an InvalidArgument.
   Status CheckAllRequiredColumnsPresent() {
-    for (int tablet_col_idx = 0;
+    for (size_t tablet_col_idx = 0;
          tablet_col_idx < tablet_schema_->num_columns();
          tablet_col_idx++) {
       const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
@@ -320,12 +337,11 @@ class ClientServerMapping {
  private:
   const Schema* const client_schema_;
   const Schema* const tablet_schema_;
-  vector<int> client_to_tablet_;
+  vector<size_t> client_to_tablet_;
   vector<bool> saw_tablet_col_;
   DISALLOW_COPY_AND_ASSIGN(ClientServerMapping);
 };
 
-
 Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row_storage,
                                                     const ClientServerMapping& mapping,
                                                     DecodedRowOperation* op) {
@@ -339,9 +355,9 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
   }
 
   // Allocate a row with the tablet's layout.
-  uint8_t* tablet_row_storage = reinterpret_cast<uint8_t*>(
+  auto tablet_row_storage = reinterpret_cast<uint8_t*>(
       dst_arena_->AllocateBytesAligned(tablet_row_size_, 8));
-  uint8_t* tablet_isset_bitmap = reinterpret_cast<uint8_t*>(
+  auto tablet_isset_bitmap = reinterpret_cast<uint8_t*>(
       dst_arena_->AllocateBytes(BitmapSize(tablet_schema_->num_columns())));
   if (PREDICT_FALSE(!tablet_row_storage || !tablet_isset_bitmap)) {
     return Status::RuntimeError("Out of memory");
@@ -357,12 +373,13 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
 
   // Now handle each of the columns passed by the user, replacing the defaults
   // from the prototype.
-  for (int client_col_idx = 0; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
+  for (size_t client_col_idx = 0;
+       client_col_idx < client_schema_->num_columns();
+       client_col_idx++) {
     // Look up the corresponding column from the tablet. We use the server-side
     // ColumnSchema object since it has the most up-to-date default, nullability,
     // etc.
-    int tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
-    DCHECK_GE(tablet_col_idx, 0);
+    size_t tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
     const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
 
     bool isset = BitmapTest(client_isset_map, client_col_idx);
@@ -371,24 +388,25 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
       // If the client provided a value for this column, copy it.
 
       // Copy null-ness, if the server side column is nullable.
-      bool client_set_to_null = col.is_nullable() &&
+      bool client_set_to_null = client_schema_->has_nullables() &&
         BitmapTest(client_null_map, client_col_idx);
       if (col.is_nullable()) {
         tablet_row.set_null(tablet_col_idx, client_set_to_null);
       }
-      // Copy the value if it's not null
       if (!client_set_to_null) {
+        // Copy the value if it's not null.
         RETURN_NOT_OK(ReadColumn(col, tablet_row.mutable_cell_ptr(tablet_col_idx)));
+      } else if (PREDICT_FALSE(!col.is_nullable())) {
+        op->SetFailureStatusOnce(Status::InvalidArgument(
+            "NULL values not allowed for non-nullable column", col.ToString()));
+        RETURN_NOT_OK(ReadColumnAndDiscard(col));
       }
     } else {
       // If the client didn't provide a value, then the column must either be nullable or
-      // have a default (which was already set in the prototype row.
-
+      // have a default (which was already set in the prototype row).
       if (PREDICT_FALSE(!(col.is_nullable() || col.has_write_default()))) {
-        // TODO: change this to return per-row errors. Otherwise if one row in a batch
-        // is missing a field for some reason, the whole batch will fail.
-        return Status::InvalidArgument("No value provided for required column",
-                                       col.ToString());
+        op->SetFailureStatusOnce(Status::InvalidArgument("No value provided for required column",
+                                                         col.ToString()));
       }
     }
   }
@@ -400,7 +418,7 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
 
 Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& mapping,
                                                     DecodedRowOperation* op) {
-  int rowkey_size = tablet_schema_->key_byte_size();
+  size_t rowkey_size = tablet_schema_->key_byte_size();
 
   const uint8_t* client_isset_map = nullptr;
   const uint8_t* client_null_map = nullptr;
@@ -412,7 +430,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
   }
 
   // Allocate space for the row key.
-  uint8_t* rowkey_storage = reinterpret_cast<uint8_t*>(
+  auto rowkey_storage = reinterpret_cast<uint8_t*>(
     dst_arena_->AllocateBytesAligned(rowkey_size, 8));
   if (PREDICT_FALSE(!rowkey_storage)) {
     return Status::RuntimeError("Out of memory");
@@ -424,26 +442,29 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
   ContiguousRow rowkey(tablet_schema_, rowkey_storage);
 
   // First process the key columns.
-  int client_col_idx = 0;
+  size_t client_col_idx = 0;
   for (; client_col_idx < client_schema_->num_key_columns(); client_col_idx++) {
     // Look up the corresponding column from the tablet. We use the server-side
     // ColumnSchema object since it has the most up-to-date default, nullability,
     // etc.
     DCHECK_EQ(mapping.client_to_tablet_idx(client_col_idx),
               client_col_idx) << "key columns should match";
-    int tablet_col_idx = client_col_idx;
+    size_t tablet_col_idx = client_col_idx;
 
     const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
     if (PREDICT_FALSE(!BitmapTest(client_isset_map, client_col_idx))) {
-      return Status::InvalidArgument("No value provided for key column",
-                                     col.ToString());
+      op->SetFailureStatusOnce(Status::InvalidArgument("No value provided for key column",
+                                                       col.ToString()));
+      continue;
     }
 
     bool client_set_to_null = client_schema_->has_nullables() &&
       BitmapTest(client_null_map, client_col_idx);
     if (PREDICT_FALSE(client_set_to_null)) {
-      return Status::InvalidArgument("NULL values not allowed for key column",
-                                     col.ToString());
+      op->SetFailureStatusOnce(Status::InvalidArgument("NULL values not allowed for key column",
+                                                       col.ToString()));
+      RETURN_NOT_OK(ReadColumnAndDiscard(col));
+      continue;
     }
 
     RETURN_NOT_OK(ReadColumn(col, rowkey.mutable_cell_ptr(tablet_col_idx)));
@@ -460,25 +481,22 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
 
     // Now process the rest of columns as updates.
     for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
-      int tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
-      DCHECK_GE(tablet_col_idx, 0);
+      size_t tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
       const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
 
       if (BitmapTest(client_isset_map, client_col_idx)) {
         bool client_set_to_null = client_schema_->has_nullables() &&
           BitmapTest(client_null_map, client_col_idx);
         uint8_t scratch[kLargestTypeSize];
-        uint8_t* val_to_add;
+        uint8_t* val_to_add = nullptr;
         if (!client_set_to_null) {
           RETURN_NOT_OK(ReadColumn(col, scratch));
           val_to_add = scratch;
-        } else {
-
-          if (PREDICT_FALSE(!col.is_nullable())) {
-            return Status::InvalidArgument("NULL value not allowed for non-nullable column",
-                                           col.ToString());
-          }
-          val_to_add = nullptr;
+        } else if (PREDICT_FALSE(!col.is_nullable())) {
+          op->SetFailureStatusOnce(Status::InvalidArgument(
+              "NULL value not allowed for non-nullable column", col.ToString()));
+          RETURN_NOT_OK(ReadColumnAndDiscard(col));
+          continue;
         }
         rcl_encoder.AddColumnUpdate(col, tablet_schema_->column_id(tablet_col_idx), val_to_add);
       }
@@ -486,32 +504,39 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
 
     if (PREDICT_FALSE(buf.size() == 0)) {
       // No actual column updates specified!
-      return Status::InvalidArgument("No fields updated, key is",
-                                     tablet_schema_->DebugRowKey(rowkey));
+      op->SetFailureStatusOnce(Status::InvalidArgument("No fields updated, key is",
+                                                       tablet_schema_->DebugRowKey(rowkey)));
     }
 
-    // Copy the row-changelist to the arena.
-    uint8_t* rcl_in_arena = reinterpret_cast<uint8_t*>(
-      dst_arena_->AllocateBytesAligned(buf.size(), 8));
-    if (PREDICT_FALSE(rcl_in_arena == nullptr)) {
-      return Status::RuntimeError("Out of memory allocating RCL");
+    if (PREDICT_TRUE(op->result.ok())) {
+      // Copy the row-changelist to the arena.
+      auto rcl_in_arena = reinterpret_cast<uint8_t*>(
+        dst_arena_->AllocateBytesAligned(buf.size(), 8));
+      if (PREDICT_FALSE(rcl_in_arena == nullptr)) {
+        return Status::RuntimeError("Out of memory allocating RCL");
+      }
+      memcpy(rcl_in_arena, buf.data(), buf.size());
+      op->changelist = RowChangeList(Slice(rcl_in_arena, buf.size()));
     }
-    memcpy(rcl_in_arena, buf.data(), buf.size());
-    op->changelist = RowChangeList(Slice(rcl_in_arena, buf.size()));
   } else if (op->type == RowOperationsPB::DELETE) {
-
     // Ensure that no other columns are set.
     for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
-      if (BitmapTest(client_isset_map, client_col_idx)) {
-        int tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
-        DCHECK_GE(tablet_col_idx, 0);
+      if (PREDICT_FALSE(BitmapTest(client_isset_map, client_col_idx))) {
+        size_t tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
         const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
+        op->SetFailureStatusOnce(Status::InvalidArgument(
+            "DELETE should not have a value for column", col.ToString()));
 
-        return Status::InvalidArgument("DELETE should not have a value for column",
-                                       col.ToString());
+        bool client_set_to_null = client_schema_->has_nullables() &&
+          BitmapTest(client_null_map, client_col_idx);
+        if (!client_set_to_null || !col.is_nullable()) {
+          RETURN_NOT_OK(ReadColumnAndDiscard(col));
+        }
       }
     }
-    op->changelist = RowChangeList::CreateDelete();
+    if (PREDICT_TRUE(op->result.ok())) {
+      op->changelist = RowChangeList::CreateDelete();
+    }
   } else {
     LOG(FATAL) << "Should only call this method with UPDATE or DELETE";
   }
@@ -533,12 +558,13 @@ Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping
   }
 
   // Now handle each of the columns passed by the user.
-  for (int client_col_idx = 0; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
+  for (size_t client_col_idx = 0;
+       client_col_idx < client_schema_->num_columns();
+       client_col_idx++) {
     // Look up the corresponding column from the tablet. We use the server-side
     // ColumnSchema object since it has the most up-to-date default, nullability,
     // etc.
-    int tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
-    DCHECK_GE(tablet_col_idx, 0);
+    size_t tablet_col_idx = mapping.client_to_tablet_idx(client_col_idx);
     const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
 
     if (BitmapTest(client_isset_map, client_col_idx)) {
@@ -551,7 +577,7 @@ Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping
       } else {
         data = column_slice.data();
       }
-      RETURN_NOT_OK(op->split_row->Set(tablet_col_idx, data));
+      RETURN_NOT_OK(op->split_row->Set(static_cast<int32_t>(tablet_col_idx), data));
     }
   }
   return Status::OK();
@@ -589,6 +615,7 @@ Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops
     RETURN_NOT_OK(DecodeOp<mode>(type, prototype_row_storage, mapping, &op));
     ops->push_back(op);
   }
+
   return Status::OK();
 }
 
diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h
index 41f2af8..c1f0bae 100644
--- a/src/kudu/common/row_operations.h
+++ b/src/kudu/common/row_operations.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <string>
@@ -70,8 +71,14 @@ struct DecodedRowOperation {
   // For SPLIT_ROW, the partial row to split on.
   std::shared_ptr<KuduPartialRow> split_row;
 
+  // Per-row result status.
+  Status result;
+
   // Stringifies, including redaction when appropriate.
   std::string ToString(const Schema& schema) const;
+
+  // The 'result' member will only be updated the first time this function is called.
+  void SetFailureStatusOnce(Status s);
 };
 
 enum DecoderMode {
@@ -99,6 +106,11 @@ class RowOperationsPBDecoder {
   Status ReadNullBitmap(const uint8_t** null_bm);
   Status GetColumnSlice(const ColumnSchema& col, Slice* slice);
   Status ReadColumn(const ColumnSchema& col, uint8_t* dst);
+  // Some column which is non-nullable has allocated a cell to row data in
+  // RowOperationsPBEncoder::Add, even if its data is useless (i.e. set to
+  // NULL), we have to consume data in order to properly validate subsequent
+  // columns and rows.
+  Status ReadColumnAndDiscard(const ColumnSchema& col);
   bool HasNext() const;
 
   Status DecodeInsertOrUpsert(const uint8_t* prototype_row_storage,
@@ -128,7 +140,7 @@ class RowOperationsPBDecoder {
   Arena* const dst_arena_;
 
   const int bm_size_;
-  const int tablet_row_size_;
+  const size_t tablet_row_size_;
   Slice src_;
 
   DISALLOW_COPY_AND_ASSIGN(RowOperationsPBDecoder);
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 2466c13..3b6c599 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -876,7 +876,7 @@ class Schema {
       if (base_idx >= 0) {
         const ColumnSchema& base_col_schema = base_schema.column(base_idx);
         // Column present in the Base Schema...
-        if (!col_schema.EqualsType(base_col_schema)) {
+        if (PREDICT_FALSE(!col_schema.EqualsType(base_col_schema))) {
           // ...but with a different type, (TODO: try with an adaptor)
           return Status::InvalidArgument("The column '" + col_schema.name() +
                                          "' must have type " +
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index 87e94e4..3c084a0 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -23,24 +23,23 @@
 #include <glog/logging.h>
 
 #include "kudu/common/wire_protocol.h"
-#include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
 
 using kudu::pb_util::SecureDebugString;
 
 namespace kudu {
 
-class Status;
-
 namespace tablet {
 
-RowOp::RowOp(DecodedRowOperation decoded_op)
-    : decoded_op(std::move(decoded_op)),
-      orig_result_from_log_(nullptr) {
-}
-
-RowOp::~RowOp() {
+RowOp::RowOp(DecodedRowOperation op)
+    : decoded_op(std::move(op)) {
+  if (!decoded_op.result.ok()) {
+    SetFailed(decoded_op.result);
+    // This row has been validated as invalid in the prior phase.
+    validated = true;
+  }
 }
 
 void RowOp::SetFailed(const Status& s) {
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index c156546..5f2124f 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -22,6 +22,8 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/tablet/lock_manager.h"
+#include "kudu/tablet/rowset.h"
+#include "kudu/tablet/tablet.pb.h"
 
 namespace kudu {
 
@@ -30,16 +32,11 @@ class Status;
 
 namespace tablet {
 
-class OperationResultPB;
-class RowSet;
-class RowSetKeyProbe;
-
 // Structure tracking the progress of a single row operation within a WriteTransaction.
 struct RowOp {
  public:
-  explicit RowOp(DecodedRowOperation decoded_op);
-  RowOp();
-  ~RowOp();
+  explicit RowOp(DecodedRowOperation op);
+  ~RowOp() = default;
 
   // Functions to set the result of the mutation.
   // Only one of the following four functions must be called, at most once.
@@ -57,7 +54,7 @@ struct RowOp {
   //
   // This pointer must stay live as long as this RowOp.
   void set_original_result_from_log(const OperationResultPB* orig_result) {
-    orig_result_from_log_ = orig_result;
+    orig_result_from_log = orig_result;
   }
 
   bool has_row_lock() const {
@@ -75,7 +72,7 @@ struct RowOp {
 
   // If this operation is being replayed from the log, set to the original
   // result. Otherwise nullptr.
-  const OperationResultPB* orig_result_from_log_;
+  const OperationResultPB* orig_result_from_log = nullptr;
 
   // The key probe structure contains the row key in both key-encoded and
   // ContiguousRow formats, bloom probe structure, etc. This is set during
@@ -86,7 +83,7 @@ struct RowOp {
   // phase.
   ScopedRowLock row_lock;
 
-  // Flag whether this op has already been validated by Tablet::ValidateOp.
+  // Flag whether this op has already been validated.
   bool validated = false;
 
   // Flag whether this op has already had 'present_in_rowset' filled in.
@@ -100,7 +97,7 @@ struct RowOp {
   // checked and found not to be alive in any RowSet.
   RowSet* present_in_rowset = nullptr;
 
-  // The result of the operation, after Apply.
+  // The result of the operation.
   gscoped_ptr<OperationResultPB> result;
 };
 
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 49e7b85..c97d93d 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -458,6 +458,7 @@ Status Tablet::AcquireRowLocks(WriteTransactionState* tx_state) {
                "num_locks", tx_state->row_ops().size());
   TRACE("Acquiring locks for $0 operations", tx_state->row_ops().size());
   for (RowOp* op : tx_state->row_ops()) {
+    if (op->has_result()) continue;
     RETURN_NOT_OK(AcquireLockForOp(tx_state, op));
   }
   TRACE("Locks acquired");
@@ -710,8 +711,8 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* io_context,
 vector<RowSet*> Tablet::FindRowSetsToCheck(const RowOp* op,
                                            const TabletComponents* comps) {
   vector<RowSet*> to_check;
-  if (PREDICT_TRUE(!op->orig_result_from_log_)) {
-    // TODO: could iterate the rowsets in a smart order
+  if (PREDICT_TRUE(!op->orig_result_from_log)) {
+    // TODO(yingchun): could iterate the rowsets in a smart order
     // based on recent statistics - eg if a rowset is getting
     // updated frequently, pick that one first.
     comps->rowsets->FindRowSetsWithKeyInRange(op->key_probe->encoded_key_slice(),
@@ -728,7 +729,7 @@ vector<RowSet*> Tablet::FindRowSetsToCheck(const RowOp* op,
 
   // If we are replaying an operation during bootstrap, then we already have a
   // COMMIT message which tells us specifically which memory store to apply it to.
-  for (const auto& store : op->orig_result_from_log_->mutated_stores()) {
+  for (const auto& store : op->orig_result_from_log->mutated_stores()) {
     if (store.has_mrs_id()) {
       to_check.push_back(comps->memrowset.get());
     } else {
@@ -804,7 +805,7 @@ Status Tablet::BulkCheckPresence(const IOContext* io_context, WriteTransactionSt
     RowOp* op = row_ops_base[i];
     // If the op already failed in validation, or if we've got the original result
     // filled in already during replay, then we don't need to consult the RowSetTree.
-    if (op->has_result() || op->orig_result_from_log_) continue;
+    if (op->has_result() || op->orig_result_from_log) continue;
     keys_and_indexes.emplace_back(op->key_probe->encoded_key_slice(), i);
   }
 
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index abd7de0..421f68c 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -166,7 +166,7 @@ Status WriteTransaction::Prepare() {
 
   Status s = tablet->DecodeWriteOperations(&client_schema, state());
   if (!s.ok()) {
-    // TODO: is MISMATCHED_SCHEMA always right here? probably not.
+    // TODO(unknown): is MISMATCHED_SCHEMA always right here? probably not.
     state()->completion_callback()->set_error(s, TabletServerErrorPB::MISMATCHED_SCHEMA);
     return s;
   }
@@ -203,6 +203,22 @@ Status WriteTransaction::Start() {
   return Status::OK();
 }
 
+void WriteTransaction::UpdatePerRowErrors() {
+  // Add per-row errors to the result, update metrics.
+  for (int i = 0; i < state()->row_ops().size(); ++i) {
+    const RowOp* op = state()->row_ops()[i];
+    if (op->result->has_failed_status()) {
+      // Replicas disregard the per row errors, for now
+      // TODO(unknown): check the per-row errors against the leader's, at least in debug mode
+      WriteResponsePB::PerRowErrorPB* error = state()->response()->add_per_row_errors();
+      error->set_row_index(i);
+      error->mutable_error()->CopyFrom(op->result->failed_status());
+    }
+
+    state()->UpdateMetricsForOp(*op);
+  }
+}
+
 // FIXME: Since this is called as a void in a thread-pool callback,
 // it seems pointless to return a Status!
 Status WriteTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
@@ -220,20 +236,7 @@ Status WriteTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
   RETURN_NOT_OK(tablet->ApplyRowOperations(state()));
   TRACE("APPLY: Finished.");
 
-  // Add per-row errors to the result, update metrics.
-  int i = 0;
-  for (const RowOp* op : state()->row_ops()) {
-    if (op->result->has_failed_status()) {
-      // Replicas disregard the per row errors, for now
-      // TODO check the per-row errors against the leader's, at least in debug mode
-      WriteResponsePB::PerRowErrorPB* error = state()->response()->add_per_row_errors();
-      error->set_row_index(i);
-      error->mutable_error()->CopyFrom(op->result->failed_status());
-    }
-
-    state()->UpdateMetricsForOp(*op);
-    i++;
-  }
+  UpdatePerRowErrors();
 
   // Create the Commit message
   commit_msg->reset(new CommitMsg());
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index bba172c..b6b6f5c 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -325,6 +325,8 @@ class WriteTransaction : public Transaction {
   std::string ToString() const override;
 
  private:
+  void UpdatePerRowErrors();
+
   // this transaction's start time
   MonoTime start_time_;
 


Mime
View raw message