impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From he...@apache.org
Subject [1/5] incubator-impala git commit: IMPALA-3788: Support for Kudu 'read-your-writes' consistency
Date Fri, 28 Oct 2016 20:02:35 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master c24e9da91 -> 3b8a0891d


IMPALA-3788: Support for Kudu 'read-your-writes' consistency

Kudu provides an API to get/set a 'latest observed
timestamp' on clients to allow a client which inserts to
capture and send this timestamp to another client before a
read to ensure that data as of that timestamp is visible.
This adds support for this feature _for reads within a
session_ by capturing the latest observed timestamp when the
KuduTableSink is sending its last update to the coordinator.
The timestamp is sent with other post-write information, and
is aggregated (i.e. taking the max) at the coordinator. The
max is stored in the session, and that value is then set in
the Kudu client on future scans.

This is being tested by running the Kudu tests after
removing delays that were introduced to work around the
issue that reads might not be visible after a write. Before
this change, if there were no delay, inconsistent results
could be returned.

Change-Id: I6bcb5fc218ad4ab935343a55b2188441d8c7dfbd
Reviewed-on: http://gerrit.cloudera.org:8080/4779
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c01644bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c01644bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c01644bc

Branch: refs/heads/master
Commit: c01644bcb9746c440cc6fd425a564ec40ea6d27c
Parents: c24e9da
Author: Matthew Jacobs <mj@cloudera.com>
Authored: Thu Oct 20 15:21:53 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Wed Oct 26 21:11:06 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node.cc              |  5 +++++
 be/src/exec/kudu-table-sink.cc             |  3 +++
 be/src/runtime/coordinator.cc              | 11 +++++++++++
 be/src/runtime/coordinator.h               |  5 +++++
 be/src/service/impala-hs2-server.cc        |  1 +
 be/src/service/impala-server.cc            |  3 +++
 be/src/service/impala-server.h             |  4 ++++
 be/src/service/query-exec-state.cc         |  9 +++++++++
 common/thrift/ImpalaInternalService.thrift |  7 +++++++
 tests/common/impala_test_suite.py          |  4 +---
 tests/query_test/test_kudu.py              |  6 ++----
 11 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 4a162a8..1aef102 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -128,6 +128,11 @@ Status KuduScanNode::Open(RuntimeState* state) {
 
   KUDU_RETURN_IF_ERROR(b.Build(&client_), "Unable to create Kudu client");
 
+  uint64_t latest_ts = static_cast<uint64_t>(
+      max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
+  VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
+  if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
+
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
       "Unable to open Kudu table");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 70a74a9..a9beb29 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -278,6 +278,7 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) {
 
   // Get the pending errors from the Kudu session. If errors overflowed the error buffer
   // we can't be sure all errors can be ignored, so an error status will be reported.
+  // TODO: Make sure Kudu handles conflict errors properly if IGNORE is set (KUDU-1563).
   bool error_overflow = false;
   session_->GetPendingErrors(&errors, &error_overflow);
   if (UNLIKELY(error_overflow)) {
@@ -327,6 +328,8 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
   Status status = CheckForErrors(state);
   (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows(
       rows_written_->value() - kudu_error_counter_->value());
+  (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_kudu_latest_observed_ts(
+      client_->GetLatestObservedTimestamp());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 0f41deb..f991e1c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1670,6 +1670,8 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams&
para
       TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
       status->__set_num_modified_rows(
           status->num_modified_rows + partition.second.num_modified_rows);
+      status->__set_kudu_latest_observed_ts(std::max(
+          partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
       status->__set_id(partition.second.id);
       status->__set_partition_base_dir(partition.second.partition_base_dir);
 
@@ -1736,6 +1738,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams&
para
   return Status::OK();
 }
 
+uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
+  uint64_t max_ts = 0;
+  for (const auto& entry : per_partition_status_) {
+    max_ts = std::max(max_ts,
+        static_cast<uint64_t>(entry.second.kudu_latest_observed_ts));
+  }
+  return max_ts;
+}
+
 RuntimeState* Coordinator::runtime_state() {
   return executor_ == NULL ? NULL : executor_->runtime_state();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9904def..066d532 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -177,6 +177,11 @@ class Coordinator {
   /// This is safe to call only after Wait()
   const PartitionStatusMap& per_partition_status() { return per_partition_status_; }
 
+  /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
+  /// was executed, or 0 if there were no Kudu timestamps reported.
+  /// This should only be called after Wait().
+  uint64_t GetLatestKuduInsertTimestamp() const;
+
   /// Gathers all updates to the catalog required once this query has completed execution.
   /// Returns true if a catalog update is required, false otherwise.
   /// Must only be called after Wait()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 488a1ee..05a58a4 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -286,6 +286,7 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
   state->network_address = ThriftServer::GetThreadConnectionContext()->network_address;
   state->last_accessed_ms = UnixMillis();
   state->hs2_version = min(MAX_SUPPORTED_HS2_VERSION, request.client_protocol);
+  state->kudu_latest_observed_ts = 0;
 
   // If the username was set by a lower-level transport, use it.
   const ThriftServer::Username& username =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index bf83eec..4886bdf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1159,6 +1159,7 @@ void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
   // proxy user is authorized to delegate as this user.
   if (!do_as_user.empty()) state->__set_delegated_user(do_as_user);
   state->network_address = network_address;
+  state->__set_kudu_latest_observed_ts(kudu_latest_observed_ts);
 }
 
 void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
@@ -1625,6 +1626,8 @@ void ImpalaServer::ConnectionStart(
     session_state->session_type = TSessionType::BEESWAX;
     session_state->network_address = connection_context.network_address;
     session_state->default_query_options = default_query_options_;
+    session_state->kudu_latest_observed_ts = 0;
+
     // If the username was set by a lower-level transport, use it.
     if (!connection_context.username.empty()) {
       session_state->connected_user = connection_context.username;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 53f3384..7d0f7c9 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -708,6 +708,10 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
     /// Time the session was last accessed.
     int64_t last_accessed_ms;
 
+    /// The latest Kudu timestamp observed after DML operations executed within this
+    /// session.
+    uint64_t kudu_latest_observed_ts;
+
     /// Number of RPCs concurrently accessing this session state. Used to detect when a
     /// session may be correctly expired after a timeout (when ref_count == 0). Typically
     /// at most one RPC will be issued against a session at a time, but clients may do

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 3dc9d6c..7c40d37 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -537,6 +537,15 @@ void ImpalaServer::QueryExecState::Done() {
   query_events_->MarkEvent("Unregister query");
 
   if (coord_.get() != NULL) {
+    // Update latest observed Kudu timestamp stored in the session.
+    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+    if (latest_kudu_ts > 0) {
+      VLOG_RPC << "Updating session latest observed Kudu timestamp: " << latest_kudu_ts;
+      lock_guard<mutex> session_lock(session_->lock);
+      session_->kudu_latest_observed_ts = std::max<uint64_t>(
+          session_->kudu_latest_observed_ts, latest_kudu_ts);
+    }
+
     // Release any reserved resources.
     Status status = exec_env_->scheduler()->Release(schedule_.get());
     if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index e9c3119..15c0020 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -240,6 +240,9 @@ struct TSessionState {
 
   // Client network address
   4: required Types.TNetworkAddress network_address
+
+  // If set, the latest Kudu timestamp observed within this session.
+  7: optional i64 kudu_latest_observed_ts;
 }
 
 // Client request including stmt to execute and query options.
@@ -430,6 +433,10 @@ struct TInsertPartitionStatus {
 
   // Fully qualified URI to the base directory for this partition.
   4: required string partition_base_dir
+
+  // The latest observed Kudu timestamp reported by the KuduSession at this partition.
+  // This value is an unsigned int64.
+  5: optional i64 kudu_latest_observed_ts
 }
 
 // The results of an INSERT query, sent to the coordinator as part of

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 2c0f3c6..df88148 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -235,7 +235,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
 
   def run_test_case(self, test_file_name, vector, use_db=None, multiple_impalad=False,
-      encoding=None, wait_secs_between_stmts=None):
+      encoding=None):
     """
     Runs the queries in the specified test based on the vector values
 
@@ -318,8 +318,6 @@ class ImpalaTestSuite(BaseTestSuite):
           if set_pattern_match != None:
             query_options_changed.append(set_pattern_match.groups()[0])
           result = self.__execute_query(target_impalad_client, query, user=user)
-          if wait_secs_between_stmts:
-            time.sleep(wait_secs_between_stmts)
       except Exception as e:
         if 'CATCH' in test_section:
           self.__verify_exceptions(test_section['CATCH'], str(e), use_db)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index c22de3e..7ccb8a2 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -39,12 +39,10 @@ class TestKuduOperations(KuduTestSuite):
   """
 
   def test_kudu_scan_node(self, vector, unique_database):
-    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database,
-        wait_secs_between_stmts=1)
+    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database)
 
   def test_kudu_crud(self, vector, unique_database):
-    self.run_test_case('QueryTest/kudu_crud', vector, use_db=unique_database,
-        wait_secs_between_stmts=1)
+    self.run_test_case('QueryTest/kudu_crud', vector, use_db=unique_database)
 
   def test_kudu_partition_ddl(self, vector, unique_database):
     self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database)


Mime
View raw message