kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ha...@apache.org
Subject kudu git commit: [java client] update propagated TS for AUTO_FLUSH_SYNC flush mode
Date Mon, 08 Jan 2018 20:10:53 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 1277f69a1 -> 72c83c571


[java client] update propagated TS for AUTO_FLUSH_SYNC flush mode

Commit be6b81057 updated propagated timestamp for all write modes
except AUTO_FLUSH_SYNC for the java client. This patch adds timestamp
propagation for this mode and updates TestHybridTime to ensure
propagated timestamp is updated for all flush modes.

Change-Id: Ibf0ca58b10842cb15ed5db7bcd4694c4d8cc3a89
Reviewed-on: http://gerrit.cloudera.org:8080/8837
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <davidralves@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72c83c57
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72c83c57
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72c83c57

Branch: refs/heads/master
Commit: 72c83c571120b0949c0e576fb8af0089861436e1
Parents: 1277f69
Author: hahao <hao.hao@cloudera.com>
Authored: Wed Dec 13 17:19:45 2017 -0800
Committer: Hao Hao <hao.hao@cloudera.com>
Committed: Mon Jan 8 20:01:19 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 10 ++++++++
 .../apache/kudu/client/AsyncKuduSession.java    | 11 ++++++++
 .../java/org/apache/kudu/client/KuduClient.java | 10 ++++++++
 .../org/apache/kudu/client/TestHybridTime.java  | 27 +++++++++++---------
 .../kudu/client/TestScannerMultiTablet.java     |  2 ++
 5 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index f23b342..dd282ff 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -333,6 +333,16 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Checks if the client received any timestamps from a server. Used for
+   * CLIENT_PROPAGATED external consistency.
+   *
+   * @return true if last propagated timestamp has been set
+   */
+  public synchronized boolean hasLastPropagatedTimestamp() {
+    return lastPropagatedTimestamp != NO_TIMESTAMP;
+  }
+
+  /**
    * Returns a synchronous {@link KuduClient} which wraps this asynchronous client.
    * Calling {@link KuduClient#close} on the returned client will close this client.
    * If this asynchronous client should outlive the returned synchronous client,

http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index b23cbba..4ee3ba3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -530,7 +530,18 @@ public class AsyncKuduSession implements SessionConfiguration {
       }
       operation.setExternalConsistencyMode(this.consistencyMode);
       operation.setIgnoreAllDuplicateRows(ignoreAllDuplicateRows);
+
+      // Add a callback to update the propagated timestamp returned from the server.
+      Callback<Deferred<OperationResponse>, OperationResponse> cb =
+        new Callback<Deferred<OperationResponse>, OperationResponse>() {
+          @Override
+          public Deferred<OperationResponse> call(OperationResponse resp) throws Exception
{
+            client.updateLastPropagatedTimestamp(resp.getWriteTimestampRaw());
+            return Deferred.fromResult(resp);
+          }
+        };
       return client.sendRpcToTablet(operation)
+          .addCallbackDeferring(cb)
           .addErrback(new SingleOperationErrCallback(operation));
     }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index b4b1d80..0539d21 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -73,6 +73,16 @@ public class KuduClient implements AutoCloseable {
   }
 
   /**
+   * Checks if the client received any timestamps from a server. Used for
+   * CLIENT_PROPAGATED external consistency.
+   *
+   * @return true if last propagated timestamp has been set
+   */
+  public boolean hasLastPropagatedTimestamp() {
+    return asyncClient.hasLastPropagatedTimestamp();
+  }
+
+  /**
    * Create a table on the cluster with the specified name, schema, and table configurations.
    * @param name the table's name
    * @param schema the table's schema

http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
index 2655134..a19861c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
@@ -87,23 +87,25 @@ public class TestHybridTime extends BaseKuduTest {
    */
   @Test(timeout = 100000)
   public void test() throws Exception {
-    AsyncKuduSession session = client.newSession();
-    session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_SYNC);
+    KuduSession session = syncClient.newSession();
+
+    // Test timestamp propagation with AUTO_FLUSH_SYNC flush mode.
+    session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC);
     session.setExternalConsistencyMode(CLIENT_PROPAGATED);
     long[] clockValues;
     long previousLogicalValue = 0;
     long previousPhysicalValue = 0;
 
-    // Test timestamp propagation with single operations
     String[] keys = new String[] {"1", "2", "3"};
     for (int i = 0; i < keys.length; i++) {
       Insert insert = table.newInsert();
       PartialRow row = insert.getRow();
       row.addString(schema.getColumnByIndex(0).getName(), keys[i]);
-      Deferred<OperationResponse> d = session.apply(insert);
-      OperationResponse response = d.join(DEFAULT_SLEEP);
-      assertTrue(response.getWriteTimestampRaw() != 0);
-      clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw());
+      OperationResponse response = session.apply(insert);
+      assertTrue(client.hasLastPropagatedTimestamp());
+      assertEquals(client.getLastPropagatedTimestamp(),
+                   response.getWriteTimestampRaw());
+      clockValues = HTTimestampToPhysicalAndLogical(client.getLastPropagatedTimestamp());
       LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString()
         + " Logical value: " + clockValues[1]);
       // on the very first write we update the clock into the future
@@ -123,7 +125,7 @@ public class TestHybridTime extends BaseKuduTest {
       }
     }
 
-    // Test timestamp propagation with Batches
+    // Test timestamp propagation with MANUAL_FLUSH flush mode.
     session.setFlushMode(AsyncKuduSession.FlushMode.MANUAL_FLUSH);
     keys = new String[] {"11", "22", "33"};
     for (int i = 0; i < keys.length; i++) {
@@ -131,14 +133,15 @@ public class TestHybridTime extends BaseKuduTest {
       PartialRow row = insert.getRow();
       row.addString(schema.getColumnByIndex(0).getName(), keys[i]);
       session.apply(insert);
-      Deferred<List<OperationResponse>> d = session.flush();
-      List<OperationResponse> responses = d.join(DEFAULT_SLEEP);
+      List<OperationResponse> responses = session.flush();
       assertEquals("Response was not of the expected size: " + responses.size(),
         1, responses.size());
 
       OperationResponse response = responses.get(0);
-      assertTrue(response.getWriteTimestampRaw() != 0);
-      clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw());
+      assertTrue(client.hasLastPropagatedTimestamp());
+      assertEquals(client.getLastPropagatedTimestamp(),
+                   response.getWriteTimestampRaw());
+      clockValues = HTTimestampToPhysicalAndLogical(client.getLastPropagatedTimestamp());
       LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString()
         + " Logical value: " + clockValues[1]);
       assertEquals(clockValues[0], previousPhysicalValue);

http://git-wip-us.apache.org/repos/asf/kudu/blob/72c83c57/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index 036f857..0365387 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -283,6 +283,8 @@ public class TestScannerMultiTablet extends BaseKuduTest {
 
   @Test(timeout = 100000)
   public void testScanTokenPropagatesTimestamp() throws Exception {
+    resetClients();
+
     // Initially, the client does not have the timestamp set.
     assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp());
     assertEquals(KuduClient.NO_TIMESTAMP, syncClient.getLastPropagatedTimestamp());


Mime
View raw message