kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [2/2] kudu git commit: [java] KUDU-1679 Propagate timestamps for scans
Date Tue, 29 Nov 2016 04:13:12 GMT
[java] KUDU-1679 Propagate timestamps for scans

This is Java counterpart for 06bb52d2acc6d311144aa905101ec5d846096611.

Change-Id: I84d45ba395f2a3fc6b54591f4a45bb4f10435910
Reviewed-on: http://gerrit.cloudera.org:8080/5248
Reviewed-by: David Ribeiro Alves <dralves@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2f1a2a06
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2f1a2a06
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2f1a2a06

Branch: refs/heads/master
Commit: 2f1a2a06d4fe719ef9f35338bc7403915517718a
Parents: 16ce5e0
Author: Alexey Serbin <aserbin@cloudera.com>
Authored: Mon Nov 28 12:15:09 2016 -0800
Committer: Alexey Serbin <aserbin@cloudera.com>
Committed: Tue Nov 29 03:19:28 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  7 ++---
 .../apache/kudu/client/AsyncKuduScanner.java    | 18 +++++++++--
 .../kudu/client/TestScannerMultiTablet.java     | 33 ++++++++++++++++++--
 3 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2f1a2a06/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 0de1fb8..5d7f912 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -207,20 +207,17 @@ public class AsyncKuduClient implements AutoCloseable {
 
   /**
    * Updates the last timestamp received from a server. Used for CLIENT_PROPAGATED
-   * external consistency. This is only publicly visible so that it can be set
-   * on tests, users should generally disregard this method.
+   * external consistency.
    *
    * @param lastPropagatedTimestamp the last timestamp received from a server
    */
-  @VisibleForTesting
   public synchronized void updateLastPropagatedTimestamp(long lastPropagatedTimestamp) {
-    if (this.lastPropagatedTimestamp == -1 ||
+    if (this.lastPropagatedTimestamp == NO_TIMESTAMP ||
         this.lastPropagatedTimestamp < lastPropagatedTimestamp) {
       this.lastPropagatedTimestamp = lastPropagatedTimestamp;
     }
   }
 
-  @VisibleForTesting
   public synchronized long getLastPropagatedTimestamp() {
     return lastPropagatedTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/2f1a2a06/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 2642def..40906ae 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -380,6 +380,9 @@ public final class AsyncKuduScanner {
             // context of the same scan.
             htTimestamp = resp.scanTimestamp;
           }
+          if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+            client.updateLastPropagatedTimestamp(resp.propagatedTimestamp);
+          }
           if (!resp.more || resp.scannerId == null) {
             scanFinished();
             return Deferred.fromResult(resp.data); // there might be data to return
@@ -657,14 +660,23 @@ public final class AsyncKuduScanner {
      */
     private final long scanTimestamp;
 
+    /**
+     * The server timestamp to propagate, if set. If the server response does
+     * not contain propagation timestamp, this field is set to special value
+     * AsyncKuduClient.NO_TIMESTAMP
+     */
+    private final long propagatedTimestamp;
+
     Response(final byte[] scannerId,
              final RowResultIterator data,
              final boolean more,
-             final long scanTimestamp) {
+             final long scanTimestamp,
+             final long propagatedTimestamp) {
       this.scannerId = scannerId;
       this.data = data;
       this.more = more;
       this.scanTimestamp = scanTimestamp;
+      this.propagatedTimestamp = propagatedTimestamp;
     }
 
     public String toString() {
@@ -817,7 +829,9 @@ public final class AsyncKuduScanner {
       }
       Response response = new Response(id, iterator, hasMore,
           resp.hasSnapTimestamp() ? resp.getSnapTimestamp()
-                                  : AsyncKuduClient.NO_TIMESTAMP);
+                                  : AsyncKuduClient.NO_TIMESTAMP,
+          resp.hasPropagatedTimestamp() ? resp.getPropagatedTimestamp()
+                                        : AsyncKuduClient.NO_TIMESTAMP);
       if (LOG.isDebugEnabled()) {
         LOG.debug(response.toString());
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/2f1a2a06/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index ef62660..f8b14aa 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -200,8 +200,8 @@ public class TestScannerMultiTablet extends BaseKuduTest {
     // specified. Verify that the scanner timestamp is set from the tablet
     // server response.
     AsyncKuduScanner scanner = client.newScannerBuilder(table)
-            .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
-            .build();
+        .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+        .build();
     assertEquals(AsyncKuduClient.NO_TIMESTAMP, scanner.getSnapshotTimestamp());
     KuduScanner syncScanner = new KuduScanner(scanner);
     assertEquals(scanner.getReadMode(), syncScanner.getReadMode());
@@ -223,6 +223,35 @@ public class TestScannerMultiTablet extends BaseKuduTest {
     assertEquals(9, rowCount);
   }
 
+  @Test(timeout = 100000)
+  public void testScanPropagatesLatestTimestamp() throws Exception {
+    AsyncKuduScanner scanner = client.newScannerBuilder(table).build();
+    // Initially, the client does not have the timestamp set.
+    assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp());
+    KuduScanner syncScanner = new KuduScanner(scanner);
+
+    assertTrue(syncScanner.hasMoreRows());
+    assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp());
+
+    int rowCount = syncScanner.nextRows().getNumRows();
+    // At this point, the call to the first tablet server should have been
+    // done already, so the client should have received the propagated timestamp
+    // in the scanner response.
+    long tsRef = client.getLastPropagatedTimestamp();
+    assertNotEquals(AsyncKuduClient.NO_TIMESTAMP, tsRef);
+
+    assertTrue(syncScanner.hasMoreRows());
+    while (syncScanner.hasMoreRows()) {
+      rowCount += syncScanner.nextRows().getNumRows();
+      final long ts = client.getLastPropagatedTimestamp();
+      // Next scan responses from tablet servers should move the propagated
+      // timestamp further.
+      assertTrue(ts > tsRef);
+      tsRef = ts;
+    }
+    assertNotEquals(0, rowCount);
+  }
+
   private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
                                       String lowerBoundKeyTwo,
                                       String exclusiveUpperBoundKeyOne,


Mime
View raw message