kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 01/02: [client] extra validation on rows upon Session::Apply()
Date Thu, 21 Feb 2019 21:56:16 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit cdb5a0f3a0d8659225bdb4ff6437a0cf3448d3c6
Author: Alexey Serbin <alexey@apache.org>
AuthorDate: Fri Feb 15 22:59:52 2019 -0800

    [client] extra validation on rows upon Session::Apply()
    
    This patch adds an extra validation step at the C++ client side for
    INSERT and UPSERT write operations prior sending them to tablet servers.
    
    More specifically, with this patch the Kudu C++ client verifies that
    each non-nullable column without default value is set for INSERT and
    UPSERT write operations before submitting WriteRequestPB to a tablet
    server.  The verification is done upon calling KuduSession::Apply().
    
    Also, a new test was added to cover the new functionality.
    
    Change-Id: I822f20f3242df7983c63ac146f298acfcfbafc68
    Reviewed-on: http://gerrit.cloudera.org:8080/12512
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/client-test.cc      | 87 +++++++++++++++++++++++++++++++++++++
 src/kudu/client/session-internal.cc | 68 +++++++++++++++++++++++++----
 src/kudu/client/session-internal.h  |  4 ++
 src/kudu/client/write_op.h          |  1 +
 4 files changed, 151 insertions(+), 9 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 1e3682b..db42248 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5875,6 +5875,93 @@ TEST_F(ClientTest, TestRetrieveAuthzTokenInParallel) {
   ASSERT_LT(num_reqs, kThreads);
 }
 
+// This test verifies that rows with column schema violations such as
+// unset non-nullable columns (with no default value) are detected at the client
+// side while calling Apply() for corresponding write operations. So, that sort
+// of schema violations can be detected prior sending an RPC to a tablet server.
+TEST_F(ClientTest, WritingRowsWithUnsetNonNullableColumns) {
+  // Make sure if all non-nullable columns (without defaults) are set for an
+  // insert operation, the operation should be successfully applied and flushed.
+  {
+    shared_ptr<KuduSession> session = client_->NewSession();
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    unique_ptr<KuduInsert> op(client_table_->NewInsert());
+    auto* row = op->mutable_row();
+    ASSERT_OK(row->SetInt32("key", 0));
+    // Set the non-nullable column without default.
+    ASSERT_OK(row->SetInt32("int_val", 1));
+    // Even if the non-nullable column with default 'non_null_with_default'
+    // is not set, apply (and underlying Flush()) should succeed.
+    ASSERT_OK(session->Apply(op.release()));
+  }
+
+  // Of course, update write operations do not have to have all non-nullable
+  // columns specified.
+  {
+    shared_ptr<KuduSession> session = client_->NewSession();
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    unique_ptr<KuduUpdate> op(client_table_->NewUpdate());
+    auto* row = op->mutable_row();
+    ASSERT_OK(row->SetInt32("key", 0));
+    ASSERT_OK(row->SetInt32("non_null_with_default", 1));
+    ASSERT_OK(session->Apply(op.release()));
+  }
+
+  // Make sure if a non-nullable column (without defaults) is not set for an
+  // insert operation, the operation cannot be applied.
+  {
+    shared_ptr<KuduSession> session = client_->NewSession();
+    ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    unique_ptr<KuduInsert> op(client_table_->NewInsert());
+    auto* row = op->mutable_row();
+    ASSERT_OK(row->SetInt32("key", 1));
+    // The non-nullable column with default 'non_null_with_default' is set.
+    ASSERT_OK(row->SetInt32("non_null_with_default", 1));
+    // The non-nullable column 'int_val' without default is not set, so
+    // Apply() should fail.
+    const auto apply_status = session->Apply(op.release());
+    ASSERT_TRUE(apply_status.IsIllegalState()) << apply_status.ToString();
+    ASSERT_STR_CONTAINS(apply_status.ToString(),
+                        "non-nullable column 'int_val' is not set");
+    // Flush() should return an error.
+    const auto flush_status = session->Flush();
+    ASSERT_TRUE(flush_status.IsIOError()) << flush_status.ToString();
+    ASSERT_STR_CONTAINS(flush_status.ToString(),
+                        "IO error: Some errors occurred");
+  }
+
+  // Make sure if a non-nullable column (without defaults) is not set for an
+  // upsert operation, the operation cannot be applied.
+  {
+    shared_ptr<KuduSession> session(client_->NewSession());
+    ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+    unique_ptr<KuduUpsert> op(client_table_->NewUpsert());
+    auto* row = op->mutable_row();
+    ASSERT_OK(row->SetInt32("key", 1));
+    // The non-nullable column 'int_val' without default is not set, so
+    // Apply() should fail.
+    const auto apply_status = session->Apply(op.release());
+    ASSERT_TRUE(apply_status.IsIllegalState()) << apply_status.ToString();
+    ASSERT_STR_CONTAINS(apply_status.ToString(),
+                        "non-nullable column 'int_val' is not set");
+    // Of course, Flush() should fail as well.
+    const auto flush_status = session->Flush();
+    ASSERT_TRUE(flush_status.IsIOError()) << flush_status.ToString();
+    ASSERT_STR_CONTAINS(flush_status.ToString(),
+                        "IO error: Some errors occurred");
+  }
+
+  // Do delete a row, only the key is necessary.
+  {
+    shared_ptr<KuduSession> session = client_->NewSession();
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    unique_ptr<KuduDelete> op(client_table_->NewDelete());
+    auto* row = op->mutable_row();
+    ASSERT_OK(row->SetInt32("key", 0));
+    ASSERT_OK(session->Apply(op.release()));
+  }
+}
+
 // Client test that assigns locations to clients and tablet servers.
 // For now, assigns a uniform location to all clients and tablet servers.
 class ClientWithLocationTest : public ClientTest {
diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc
index a4a3092..f124596 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -31,12 +31,14 @@
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/schema.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/util/logging.h"
 
 using std::unique_ptr;
+using strings::Substitute;
 
 namespace kudu {
 
@@ -166,8 +168,8 @@ Status KuduSession::Data::SetBufferBytesLimit(size_t size) {
 Status KuduSession::Data::SetBufferFlushWatermark(int watermark_pct) {
   if (watermark_pct < 0 || watermark_pct > 100) {
     return Status::InvalidArgument(
-        strings::Substitute("$0: watermark must be between 0 and 100 inclusive",
-                            watermark_pct));
+        Substitute("$0: watermark must be between 0 and 100 inclusive",
+                   watermark_pct));
   }
   std::lock_guard<Mutex> l(mutex_);
   if (HasPendingOperationsUnlocked()) {
@@ -326,6 +328,57 @@ MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta&
max_age) {
   return time_left;
 }
 
+namespace {
+// Check if the primary key is set for the write operation.
+Status CheckForPrimaryKey(const KuduWriteOperation& op) {
+  if (PREDICT_FALSE(!op.row().IsKeySet())) {
+    return Status::IllegalState("Key not specified", KUDU_REDACT(op.ToString()));
+  }
+  return Status::OK();
+}
+
+// Check if the values for the non-nullable columns are present.
+Status CheckForNonNullableColumns(const KuduWriteOperation& op) {
+  const auto& row = op.row();
+  const auto* schema = row.schema();
+  const auto num_columns = schema->num_columns();
+  for (auto idx = 0; idx < num_columns; ++idx) {
+    const ColumnSchema& col = schema->column(idx);
+    if (!col.is_nullable() && !col.has_write_default() &&
+        !row.IsColumnSet(idx)) {
+      return Status::IllegalState(Substitute(
+          "non-nullable column '$0' is not set", schema->column(idx).name()),
+          KUDU_REDACT(op.ToString()));
+    }
+  }
+  return Status::OK();
+}
+} // anonymous namespace
+
+#define RETURN_NOT_OK_ADD_ERROR(_func, _op, _error_collector) \
+  do { \
+    const auto& s = (_func)(*(_op)); \
+    if (PREDICT_FALSE(!s.ok())) { \
+      (_error_collector)->AddError( \
+          unique_ptr<KuduError>(new KuduError((_op), s))); \
+      return s; \
+    } \
+  } while (false)
+
+Status KuduSession::Data::ValidateWriteOperation(KuduWriteOperation* op) const {
+  RETURN_NOT_OK_ADD_ERROR(CheckForPrimaryKey, op, error_collector_);
+  switch (op->type()) {
+    case KuduWriteOperation::INSERT:
+    case KuduWriteOperation::UPSERT:
+      RETURN_NOT_OK_ADD_ERROR(CheckForNonNullableColumns, op, error_collector_);
+      break;
+    default:
+      // Nothing else to validate for other types of write operations.
+      break;
+  }
+  return Status::OK();
+}
+
 // This method takes ownership over the specified write operation. On the return
 // from this this method, the operation must end up either in the corresponding
 // batcher (success path) or in the error collector (failure path). Otherwise
@@ -334,11 +387,8 @@ Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* write_op)
{
   if (PREDICT_FALSE(!write_op)) {
     return Status::InvalidArgument("NULL operation");
   }
-  if (PREDICT_FALSE(!write_op->row().IsKeySet())) {
-    Status status = Status::IllegalState("Key not specified", KUDU_REDACT(write_op->ToString()));
-    error_collector_->AddError(unique_ptr<KuduError>(new KuduError(write_op, status)));
-    return status;
-  }
+
+  RETURN_NOT_OK(ValidateWriteOperation(write_op));
 
   // Get 'wire size' of the write operation.
   const int64_t required_size = Batcher::GetOperationSizeInBuffer(write_op);
@@ -358,7 +408,7 @@ Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* write_op) {
   // verify that the single operation can fit into an empty buffer
   // given the restriction on the buffer size.
   if (PREDICT_FALSE(required_size > max_size)) {
-    Status s = Status::Incomplete(strings::Substitute(
+    Status s = Status::Incomplete(Substitute(
           "buffer size limit is too small to fit operation: "
           "required $0, size limit $1",
           required_size, max_size));
@@ -403,7 +453,7 @@ Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* write_op) {
         condition_.Wait();
       }
     } else if (PREDICT_FALSE(buffer_bytes_used_ + required_size > max_size)) {
-      Status s = Status::Incomplete(strings::Substitute(
+      Status s = Status::Incomplete(Substitute(
           "not enough mutation buffer space remaining for operation: "
           "required additional $0 when $1 of $2 already used",
           required_size, buffer_bytes_used_, max_size));
diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h
index c5a4669..cbad1b3 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -155,6 +155,10 @@ class KuduSession::Data {
   // This method is used by tests only.
   size_t GetBatchersCountForTests() const;
 
+  // Run sanity checks on a write operation: check for the presence of the
+  // primary key and perform other validations with regard to the column schema.
+  Status ValidateWriteOperation(KuduWriteOperation* op) const;
+
   // This constant represents a meaningful name for the first argument in
   // expressions like FlushCurrentBatcher(1, cbk): this is the watermark
   // corresponding to 1 byte of data. This watermark level is the minimum
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index 0d6a9e6..a0a5f61 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -115,6 +115,7 @@ class KUDU_EXPORT KuduWriteOperation {
   friend class internal::Batcher;
   friend class internal::WriteRpc;
   friend class internal::ErrorCollector;
+  friend class KuduSession;
 
   // Create and encode the key for this write (key must be set)
   //


Mime
View raw message