kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/4] kudu git commit: KUDU-1679 Propagate timestamps for scans
Date Sun, 20 Nov 2016 18:47:03 GMT
KUDU-1679 Propagate timestamps for scans

Added the 'propagated_timestamp' field into the ScanResponsePB message.
It's always set by the tablet server which processed the incoming
NewScanRequestPB message successfully.

Also, added a unit test to verify the presence of the field in tablet
server response messages.

Change-Id: I4d79024b088ea88fd194cabcb61e640f66326264
Reviewed-on: http://gerrit.cloudera.org:8080/5099
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/06bb52d2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/06bb52d2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/06bb52d2

Branch: refs/heads/master
Commit: 06bb52d2acc6d311144aa905101ec5d846096611
Parents: 6a0ccdf
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Tue Nov 15 19:16:30 2016 -0800
Committer: David Ribeiro Alves <dralves@apache.org>
Committed: Sat Nov 19 17:45:13 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc                  | 134 ++++++++++++++++---
 src/kudu/client/scan_configuration.cc           |   3 +
 src/kudu/client/scan_configuration.h            |  16 ++-
 src/kudu/client/scan_token-internal.cc          |  39 +++---
 src/kudu/client/scanner-internal.cc             |  38 ++++--
 src/kudu/integration-tests/consistency-itest.cc |   2 +-
 src/kudu/tserver/tablet_server-test.cc          |  23 +++-
 src/kudu/tserver/tablet_service.cc              |   2 +
 src/kudu/tserver/tserver.proto                  |   4 +
 9 files changed, 204 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index a692b8f..e780017 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -587,6 +587,33 @@ class ClientTest : public KuduTest {
                          const vector<size_t>& string_sizes,
                          CpuTimes* elapsed);
 
+  // Perform scan on the given table in the specified read mode, counting
+  // number of returned rows. The 'scan_ts' parameter is effective only in case
+  // of READ_AT_SNAPHOT mode. All scans are performed against the tablet's
+  // leader server.
+  static Status CountRowsOnLeaders(KuduTable* table,
+                                   KuduScanner::ReadMode scan_mode,
+                                   uint64_t scan_ts,
+                                   size_t* row_count) {
+    KuduScanner scanner(table);
+    RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
+    RETURN_NOT_OK(scanner.SetReadMode(scan_mode));
+    if (scan_mode == KuduScanner::READ_AT_SNAPSHOT) {
+      RETURN_NOT_OK(scanner.SetSnapshotRaw(scan_ts + 1));
+    }
+    RETURN_NOT_OK(scanner.Open());
+    KuduScanBatch batch;
+    size_t count = 0;
+    while (scanner.HasMoreRows()) {
+      RETURN_NOT_OK(scanner.NextBatch(&batch));
+      count += batch.NumRows();
+    }
+    if (row_count) {
+      *row_count = count;
+    }
+    return Status::OK();
+  }
+
   enum WhichServerToKill {
     DEAD_MASTER,
     DEAD_TSERVER
@@ -3755,30 +3782,107 @@ TEST_F(ClientTest, TestCreateTableWithInvalidEncodings) {
                       "DICT_ENCODING not supported for type INT32");
 }
 
+// Check the behavior of the latest observed timestamp when performing
+// write and read operations.
 TEST_F(ClientTest, TestLatestObservedTimestamp) {
   // Check that a write updates the latest observed timestamp.
-  uint64_t ts0 = client_->GetLatestObservedTimestamp();
-  ASSERT_EQ(ts0, KuduClient::kNoTimestamp);
+  const uint64_t ts0 = client_->GetLatestObservedTimestamp();
+  ASSERT_EQ(KuduClient::kNoTimestamp, ts0);
   ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(), 1, 0));
-  uint64_t ts1 = client_->GetLatestObservedTimestamp();
+  const uint64_t ts1 = client_->GetLatestObservedTimestamp();
   ASSERT_NE(ts0, ts1);
 
-  // Check that the timestamp of the previous write will be observed by another
-  // client performing a snapshot scan at that timestamp.
+  // Check that the latest observed timestamp moves forward when
+  // a scan is performed by the same or another client, even if reading/scanning
+  // at the fixed timestamp observed at the prior insert.
+  uint64_t latest_ts = ts1;
   shared_ptr<KuduClient> client;
-  shared_ptr<KuduTable> table;
   ASSERT_OK(KuduClientBuilder()
       .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
       .Build(&client));
-  ASSERT_EQ(client->GetLatestObservedTimestamp(), KuduClient::kNoTimestamp);
-  ASSERT_OK(client->OpenTable(client_table_->name(), &table));
-  KuduScanner scanner(table.get());
-  ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
-  ASSERT_OK(scanner.SetSnapshotRaw(ts1));
-  ASSERT_OK(scanner.Open());
-  scanner.Close();
-  uint64_t ts2 = client->GetLatestObservedTimestamp();
-  ASSERT_EQ(ts1, ts2);
+  vector<shared_ptr<KuduClient>> clients{ client_, client };
+  for (auto& c : clients) {
+    if (c != client_) {
+      // Check that the new client has no latest observed timestamp.
+      ASSERT_EQ(KuduClient::kNoTimestamp, c->GetLatestObservedTimestamp());
+    }
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(c->OpenTable(client_table_->name(), &table));
+    static const KuduScanner::ReadMode kReadModes[] = {
+      KuduScanner::READ_AT_SNAPSHOT,
+      KuduScanner::READ_LATEST,
+    };
+    for (auto read_mode : kReadModes) {
+      KuduScanner scanner(table.get());
+      ASSERT_OK(scanner.SetReadMode(read_mode));
+      if (read_mode == KuduScanner::READ_AT_SNAPSHOT) {
+        ASSERT_OK(scanner.SetSnapshotRaw(ts1));
+      }
+      ASSERT_OK(scanner.Open());
+    }
+    const uint64_t ts = c->GetLatestObservedTimestamp();
+    ASSERT_LT(latest_ts, ts);
+    latest_ts = ts;
+  }
+}
+
+// Insert bunch of rows, delete a row, and then insert the row back.
+// Run scans several scan and check the results are consistent with the
+// specified timestamps:
+//   * at the snapshot corresponding the timestamp when the row was deleted
+//   * at the snapshot corresponding the timestamp when the row was inserted
+//     back
+//   * read the latest data with no specified timestamp (READ_LATEST)
+TEST_F(ClientTest, TestScanAtLatestObservedTimestamp) {
+  const uint64_t pre_timestamp =
+      cluster_->mini_tablet_server(0)->server()->clock()->Now().ToUint64();
+  static const size_t kRowsNum = 2;
+  NO_FATALS(InsertTestRows(client_table_.get(), kRowsNum, 0));
+  const uint64_t ts_inserted_rows = client_->GetLatestObservedTimestamp();
+  // Delete one row (key == 0)
+  NO_FATALS(DeleteTestRows(client_table_.get(), 0, 1));
+  const uint64_t ts_deleted_row = client_->GetLatestObservedTimestamp();
+  // Insert the deleted row back.
+  NO_FATALS(InsertTestRows(client_table_.get(), 1, 0));
+  const uint64_t ts_all_rows = client_->GetLatestObservedTimestamp();
+  ASSERT_GT(ts_all_rows, ts_deleted_row);
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(KuduClientBuilder()
+      .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+      .Build(&client));
+  vector<shared_ptr<KuduClient>> clients{ client_, client };
+  for (auto& c : clients) {
+    SCOPED_TRACE((c == client_) ? "the same client" : "another client");
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(c->OpenTable(client_table_->name(), &table));
+    // There should be no rows in the table prior to the initial insert.
+    size_t row_count_before_inserted_rows;
+    ASSERT_OK(CountRowsOnLeaders(table.get(), KuduScanner::READ_AT_SNAPSHOT,
+                                 pre_timestamp, &row_count_before_inserted_rows));
+    EXPECT_EQ(0, row_count_before_inserted_rows);
+    // There should be kRowsNum rows if scanning at the initial insert timestamp.
+    size_t row_count_ts_inserted_rows;
+    ASSERT_OK(CountRowsOnLeaders(table.get(), KuduScanner::READ_AT_SNAPSHOT,
+                                 ts_inserted_rows, &row_count_ts_inserted_rows));
+    EXPECT_EQ(kRowsNum, row_count_ts_inserted_rows);
+    // There should be one less row if scanning at the deleted row timestamp.
+    size_t row_count_ts_deleted_row;
+    ASSERT_OK(CountRowsOnLeaders(table.get(), KuduScanner::READ_AT_SNAPSHOT,
+                                 ts_deleted_row, &row_count_ts_deleted_row));
+    EXPECT_EQ(kRowsNum - 1, row_count_ts_deleted_row);
+    // There should be kNumRows rows if scanning at the 'after last insert'
+    // timestamp.
+    size_t row_count_ts_all_rows;
+    ASSERT_OK(CountRowsOnLeaders(table.get(), KuduScanner::READ_AT_SNAPSHOT,
+                                 ts_all_rows, &row_count_ts_all_rows));
+    EXPECT_EQ(kRowsNum, row_count_ts_all_rows);
+    // There should be 2 rows if scanning in the 'read latest' mode.
+    size_t row_count_read_latest;
+    ASSERT_OK(CountRowsOnLeaders(table.get(), KuduScanner::READ_LATEST,
+                                 0, &row_count_read_latest));
+    EXPECT_EQ(kRowsNum, row_count_read_latest);
+  }
 }
 
 TEST_F(ClientTest, TestClonePredicates) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/client/scan_configuration.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index f736174..f396a99 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -32,6 +32,9 @@ using std::vector;
 namespace kudu {
 namespace client {
 
+const uint64_t ScanConfiguration::kNoTimestamp = KuduClient::kNoTimestamp;
+const int ScanConfiguration::kHtTimestampBitsToShift = 12;
+
 ScanConfiguration::ScanConfiguration(KuduTable* table)
     : table_(table),
       projection_(table->schema().schema_),

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/client/scan_configuration.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h
index 4aaa888..dba2d2d 100644
--- a/src/kudu/client/scan_configuration.h
+++ b/src/kudu/client/scan_configuration.h
@@ -36,10 +36,6 @@ namespace client {
 // corresponding methods on KuduScanner.
 class ScanConfiguration {
  public:
-
-  static const int64_t kNoTimestamp = -1;
-  static const int kHtTimestampBitsToShift = 12;
-
   explicit ScanConfiguration(KuduTable* table);
   ~ScanConfiguration() = default;
 
@@ -124,7 +120,12 @@ class ScanConfiguration {
     return is_fault_tolerant_;
   }
 
-  int64_t snapshot_timestamp() const {
+  bool has_snapshot_timestamp() const {
+    return snapshot_timestamp_ != kNoTimestamp;
+  }
+
+  uint64_t snapshot_timestamp() const {
+    CHECK(has_snapshot_timestamp());
     return snapshot_timestamp_;
   }
 
@@ -139,6 +140,9 @@ class ScanConfiguration {
  private:
   friend class KuduScanTokenBuilder;
 
+  static const uint64_t kNoTimestamp;
+  static const int kHtTimestampBitsToShift;
+
   // Non-owned, non-null table.
   KuduTable* table_;
 
@@ -159,7 +163,7 @@ class ScanConfiguration {
 
   bool is_fault_tolerant_;
 
-  int64_t snapshot_timestamp_;
+  uint64_t snapshot_timestamp_;
 
   MonoDelta timeout_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index a9bddd5..c0d327d 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -135,15 +135,14 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
 
   if (message.has_read_mode()) {
     switch (message.read_mode()) {
-      case ReadMode::READ_LATEST: {
+      case ReadMode::READ_LATEST:
         RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_LATEST));
         break;
-      }
-      case ReadMode::READ_AT_SNAPSHOT: {
+      case ReadMode::READ_AT_SNAPSHOT:
         RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
         break;
-      }
-      default: return Status::InvalidArgument("scan token has unrecognized read mode");
+      default:
+        return Status::InvalidArgument("scan token has unrecognized read mode");
     }
   }
 
@@ -199,19 +198,23 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>*
tokens) {
     ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
   }
 
-  switch (configuration_.read_mode()) {
-    case KuduScanner::READ_LATEST: pb.set_read_mode(kudu::READ_LATEST); break;
-    case KuduScanner::READ_AT_SNAPSHOT: pb.set_read_mode(kudu::READ_AT_SNAPSHOT); break;
-    default: LOG(FATAL) << "Unexpected read mode.";
-  }
-
-  if (configuration_.snapshot_timestamp() != ScanConfiguration::kNoTimestamp) {
-    if (PREDICT_FALSE(configuration_.read_mode() != KuduScanner::READ_AT_SNAPSHOT)) {
-      LOG(WARNING) << "Scan token snapshot timestamp set but read mode was READ_LATEST."
-                      " Ignoring timestamp.";
-    } else {
-      pb.set_snap_timestamp(configuration_.snapshot_timestamp());
-    }
+  const KuduScanner::ReadMode read_mode = configuration_.read_mode();
+  switch (read_mode) {
+    case KuduScanner::READ_LATEST:
+      pb.set_read_mode(kudu::READ_LATEST);
+      if (configuration_.has_snapshot_timestamp()) {
+        LOG(WARNING) << "Ignoring snapshot timestamp since not in "
+                        "READ_AT_TIMESTAMP mode.";
+      }
+      break;
+    case KuduScanner::READ_AT_SNAPSHOT:
+      pb.set_read_mode(kudu::READ_AT_SNAPSHOT);
+      if (configuration_.has_snapshot_timestamp()) {
+        pb.set_snap_timestamp(configuration_.snapshot_timestamp());
+      }
+      break;
+    default:
+      LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
   }
 
   pb.set_cache_blocks(configuration_.spec().cache_blocks());

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 0ddc5de..129d0da 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -259,10 +259,23 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   PrepareRequest(KuduScanner::Data::NEW);
   next_req_.clear_scanner_id();
   NewScanRequestPB* scan = next_req_.mutable_new_scan_request();
-  switch (configuration_.read_mode()) {
-    case READ_LATEST: scan->set_read_mode(kudu::READ_LATEST); break;
-    case READ_AT_SNAPSHOT: scan->set_read_mode(kudu::READ_AT_SNAPSHOT); break;
-    default: LOG(FATAL) << "Unexpected read mode.";
+  const KuduScanner::ReadMode read_mode = configuration_.read_mode();
+  switch (read_mode) {
+    case KuduScanner::READ_LATEST:
+      scan->set_read_mode(kudu::READ_LATEST);
+      if (configuration_.has_snapshot_timestamp()) {
+        LOG(WARNING) << "Ignoring snapshot timestamp since "
+                        "not in READ_AT_SNAPSHOT mode.";
+      }
+      break;
+    case KuduScanner::READ_AT_SNAPSHOT:
+      scan->set_read_mode(kudu::READ_AT_SNAPSHOT);
+      if (configuration_.has_snapshot_timestamp()) {
+        scan->set_snap_timestamp(configuration_.snapshot_timestamp());
+      }
+      break;
+    default:
+      LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
   }
 
   if (configuration_.is_fault_tolerant()) {
@@ -279,13 +292,11 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
 
   scan->set_cache_blocks(configuration_.spec().cache_blocks());
 
-  if (configuration_.snapshot_timestamp() != ScanConfiguration::kNoTimestamp) {
-    if (PREDICT_FALSE(configuration_.read_mode() != READ_AT_SNAPSHOT)) {
-      LOG(WARNING) << "Scan snapshot timestamp set but read mode was READ_LATEST."
-          " Ignoring timestamp.";
-    } else {
-      scan->set_snap_timestamp(configuration_.snapshot_timestamp());
-    }
+  // For consistent operations, propagate the timestamp among all operations
+  // performed the context of the same client.
+  const uint64_t lo_ts = table_->client()->data_->GetLatestObservedTimestamp();
+  if (lo_ts != KuduClient::kNoTimestamp) {
+    scan->set_propagated_timestamp(lo_ts);
   }
 
   // Set up the predicates.
@@ -405,8 +416,9 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
     }
   }
 
-  if (last_response_.has_snap_timestamp()) {
-    table_->client()->data_->UpdateLatestObservedTimestamp(last_response_.snap_timestamp());
+  if (last_response_.has_propagated_timestamp()) {
+    table_->client()->data_->UpdateLatestObservedTimestamp(
+        last_response_.propagated_timestamp());
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 80e3bd9..4ff31c1 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -266,7 +266,7 @@ class TimestampPropagationTest : public MiniClusterITestBase {
 //     behind Ta server's time, and scanning at Tb's write time would not
 //     include the rows inserted into Ta.
 //
-TEST_F(TimestampPropagationTest, DISABLED_TwoBatchesAndReadAtSnapshot) {
+TEST_F(TimestampPropagationTest, TwoBatchesAndReadAtSnapshot) {
   uint64_t ts_a;
   {
     shared_ptr<KuduClient> client;

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 69c7e9c..4d9ab10 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -1050,8 +1050,8 @@ TEST_F(TabletServerTest, TestScannerOpenWhenServerShutsDown) {
 }
 
 TEST_F(TabletServerTest, TestSnapshotScan) {
-  int num_rows = AllowSlowTests() ? 1000 : 100;
-  int num_batches = AllowSlowTests() ? 100 : 10;
+  const int num_rows = AllowSlowTests() ? 1000 : 100;
+  const int num_batches = AllowSlowTests() ? 100 : 10;
   vector<uint64_t> write_timestamps_collector;
 
   // perform a series of writes and collect the timestamps
@@ -1084,6 +1084,7 @@ TEST_F(TabletServerTest, TestSnapshotScan) {
     ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
     req.set_call_seq_id(0);
 
+    const Timestamp pre_scan_ts = mini_server_->server()->clock()->Now();
     // Send the call
     {
       SCOPED_TRACE(req.DebugString());
@@ -1093,6 +1094,12 @@ TEST_F(TabletServerTest, TestSnapshotScan) {
       ASSERT_FALSE(resp.has_error());
     }
 
+    // The 'propagated_timestamp' field must be set for 'success' responses.
+    ASSERT_TRUE(resp.has_propagated_timestamp());
+    ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(),
+              resp.propagated_timestamp());
+    ASSERT_LT(pre_scan_ts.ToUint64(), resp.propagated_timestamp());
+
     ASSERT_TRUE(resp.has_more_results());
     // Drain all the rows from the scanner.
     vector<string> results;
@@ -1135,7 +1142,7 @@ TEST_F(TabletServerTest, TestSnapshotScan_WithoutSnapshotTimestamp)
{
   req.set_batch_size_bytes(0); // so it won't return data right away
   scan->set_read_mode(READ_AT_SNAPSHOT);
 
-  Timestamp now = mini_server_->server()->clock()->Now();
+  const Timestamp pre_scan_ts = mini_server_->server()->clock()->Now();
 
   // Send the call
   {
@@ -1147,7 +1154,15 @@ TEST_F(TabletServerTest, TestSnapshotScan_WithoutSnapshotTimestamp)
{
   }
 
   // make sure that the snapshot timestamp that was selected is >= now
-  ASSERT_GE(resp.snap_timestamp(), now.ToUint64());
+  ASSERT_GE(resp.snap_timestamp(), pre_scan_ts.ToUint64());
+  // The 'propagated_timestamp' field must be set for all successful responses.
+  ASSERT_TRUE(resp.has_propagated_timestamp());
+  ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(),
+            resp.propagated_timestamp());
+  ASSERT_LT(pre_scan_ts.ToUint64(), resp.propagated_timestamp());
+  // The propagated timestamp should be after (i.e. greater) than the scan
+  // timestamp.
+  ASSERT_GT(resp.propagated_timestamp(), resp.snap_timestamp());
 }
 
 // Tests that a snapshot in the future (beyond the current time plus maximum

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 5f380c4..e0b334f 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1108,6 +1108,7 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
       resp->set_last_primary_key(last.ToString());
     }
   }
+  resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());
   SetResourceMetrics(resp->mutable_resource_metrics(), context);
   context->RespondSuccess();
 }
@@ -1482,6 +1483,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer,
         if (!s.ok()) {
           tmp_error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
         }
+        break;
       }
       TRACE("Iterator created");
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/06bb52d2/src/kudu/tserver/tserver.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index ffafade..95f4fce 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -347,6 +347,10 @@ message ScanResponsePB {
 
   // The resource usage of this RPC.
   optional ResourceMetricsPB resource_metrics = 8;
+
+  // The server's time upon sending out the scan response. Should always
+  // be greater than the scan timestamp.
+  optional fixed64 propagated_timestamp = 9;
 }
 
 // A scanner keep-alive request.


Mime
View raw message