kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject kudu git commit: KUDU-2095: [java] Add scanner keepAlive API to the Java client
Date Sat, 15 Sep 2018 22:25:13 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 72a77bfbf -> 42db87b0b


KUDU-2095: [java] Add scanner keepAlive API to the Java client

This patch adds keepAlive methods to the
AsyncKuduScanner and KuduScanner. These methods
leverage a package private method added to the
AsyncKuduClient using a similar implementation
pattern to existing scan related RPCs. The behavior of
this implementation mimics the C++ client.

Change-Id: Ic802f556c8860cdd43ef5f794c8f3658259bd0be
Reviewed-on: http://gerrit.cloudera.org:8080/11436
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Grant Henke <granthenke@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/42db87b0
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/42db87b0
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/42db87b0

Branch: refs/heads/master
Commit: 42db87b0b128c573b96e39615e7fa41227fea368
Parents: 72a77bf
Author: Grant Henke <granthenke@apache.org>
Authored: Thu Sep 13 14:05:55 2018 -0500
Committer: Grant Henke <granthenke@apache.org>
Committed: Sat Sep 15 22:12:37 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  30 ++-
 .../apache/kudu/client/AsyncKuduScanner.java    |  88 ++++++-
 .../org/apache/kudu/client/KuduScanner.java     |  25 +-
 .../org/apache/kudu/client/TestKuduClient.java  | 233 +++++++++++++++----
 .../apache/kudu/client/TestRemoteTablet.java    |  21 ++
 src/kudu/client/client.h                        |  13 +-
 6 files changed, 347 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 8c1e032..62425a4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1004,7 +1004,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
   /**
    * Package-private access point for {@link AsyncKuduScanner}s to close themselves.
-   * @param scanner the scanner to close
+   * @param scanner the scanner to close.
    * @return a deferred object that indicates the completion of the request.
    * The {@link AsyncKuduScanner.Response} can contain rows that were left to scan.
    */
@@ -1028,6 +1028,34 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Package-private access point for {@link AsyncKuduScanner}s to keep themselves
+   * alive on tablet servers.
+   * @param scanner the scanner to keep alive.
+   * @return a deferred object that indicates the completion of the request.
+   */
+  Deferred<Void> keepAlive(final AsyncKuduScanner scanner) {
+    checkIsClosed();
+    final RemoteTablet tablet = scanner.currentTablet();
+    // Getting a null tablet here without being in a closed state means we were in between
tablets.
+    // If there is no scanner to keep alive, we still return Status.OK().
+    if (tablet == null) {
+      return Deferred.fromResult(null);
+    }
+
+    final KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest();
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection());
+    if (info == null) {
+      return Deferred.fromResult(null);
+    }
+
+    final Deferred<Void> d = keepAliveRequest.getDeferred();
+    keepAliveRequest.attempt++;
+    RpcProxy.sendRpc(this, connectionCache.getConnection(
+        info, Connection.CredentialsPolicy.ANY_CREDENTIALS), keepAliveRequest);
+    return d;
+  }
+
+  /**
    * Sends the provided {@link KuduRpc} to the tablet server hosting the leader
    * of the tablet identified by the RPC's table and partition key.
    *

http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index dd61bf4..804978e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -43,6 +43,8 @@ import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB;
+import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -662,14 +664,6 @@ public final class AsyncKuduScanner {
   }
 
   /**
-   * Sets the name of the tabletSlice that's hosting {@code this.start_key}.
-   * @param tablet The tabletSlice we're currently supposed to be scanning.
-   */
-  void setTablet(final RemoteTablet tablet) {
-    this.tablet = tablet;
-  }
-
-  /**
    * Invalidates this scanner and makes it assume it's no longer opened.
    * When a TabletServer goes away while we're scanning it, or some other type
    * of access problem happens, this method should be called so that the
@@ -704,6 +698,31 @@ public final class AsyncKuduScanner {
   }
 
   /**
+   * Keep the current remote scanner alive.
+   * <p>
+   * Keep the current remote scanner alive on the Tablet server for an
+   * additional time-to-live. This is useful if the interval in between
+   * nextRows() calls is big enough that the remote scanner might be garbage
+   * collected. The scanner time-to-live can be configured on the tablet
+   * server via the --scanner_ttl_ms configuration flag and has a default
+   * of 60 seconds.
+   * <p>
+   * This does not invalidate any previously fetched results.
+   * <p>
+   * Note that an error returned by this method should not be taken as indication
+   * that the scan has failed. Subsequent calls to nextRows() might still be successful,
+   * particularly if the scanner is configured to be fault tolerant.
+   * @return A deferred object that indicates the completion of the request.
+   * @throws IllegalStateException if the scanner is already closed.
+   */
+  public Deferred<Void> keepAlive() {
+    if (closed) {
+      throw new IllegalStateException("Scanner has already been closed");
+    }
+    return client.keepAlive(this);
+  }
+
+  /**
    * Returns an RPC to fetch the next rows.
    */
   KuduRpc<Response> getNextRowsRequest() {
@@ -718,6 +737,14 @@ public final class AsyncKuduScanner {
   }
 
   /**
+   * Returns an RPC to keep this scanner alive on the tablet server.
+   * @return a new {@link KeepAliveRequest}
+   */
+  KuduRpc<Void> getKeepAliveRequest() {
+    return new KeepAliveRequest(table, tablet);
+  }
+
+  /**
    * Throws an exception if scanning already started.
    * @throws IllegalStateException if scanning already started.
    */
@@ -796,6 +823,51 @@ public final class AsyncKuduScanner {
   }
 
   /**
+   * RPC sent out to keep a scanner alive on a TabletServer.
+   */
+  final class KeepAliveRequest extends KuduRpc<Void> {
+
+    KeepAliveRequest(KuduTable table, RemoteTablet tablet) {
+      super(table);
+      setTablet(tablet);
+      this.setTimeoutMillis(scanRequestTimeout);
+    }
+
+    @Override
+    String serviceName() {
+      return TABLET_SERVER_SERVICE_NAME;
+    }
+
+    @Override
+    String method() {
+      return "ScannerKeepAlive";
+    }
+
+    @Override
+    ReplicaSelection getReplicaSelection() {
+      return replicaSelection;
+    }
+
+    /** Serializes this request.  */
+    @Override
+    Message createRequestPB() {
+      final ScannerKeepAliveRequestPB.Builder builder = ScannerKeepAliveRequestPB.newBuilder();
+      builder.setScannerId(UnsafeByteOperations.unsafeWrap(scannerId));
+      return builder.build();
+    }
+
+    @Override
+    Pair<Void, Object> deserialize(final CallResponse callResponse,
+                                   String tsUUID) throws KuduException {
+      ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder();
+      readProtobuf(callResponse.getPBMessage(), builder);
+      ScannerKeepAliveResponsePB resp = builder.build();
+      TabletServerErrorPB error = resp.hasError() ? resp.getError() : null;
+      return new Pair<Void, Object>(null, error);
+    }
+  }
+
+  /**
    * RPC sent out to fetch the next rows from the TabletServer.
    */
   final class ScanRequest extends KuduRpc<Response> {

http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 13602a5..209fada 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -51,18 +51,39 @@ public class KuduScanner {
    * {@code Scanner} is done scanning), calling it again leads to an undefined
    * behavior.
    * @return a list of rows.
-   * @throws KuduException if anything went wrong
+   * @throws KuduException if anything went wrong.
    */
   public RowResultIterator nextRows() throws KuduException {
     return KuduClient.joinAndHandleException(asyncScanner.nextRows());
   }
 
   /**
+   * Keep the current remote scanner alive.
+   * <p>
+   * Keep the current remote scanner alive on the Tablet server for an
+   * additional time-to-live. This is useful if the interval in between
+   * nextRows() calls is big enough that the remote scanner might be garbage
+   * collected. The scanner time-to-live can be configured on the tablet
+   * server via the --scanner_ttl_ms configuration flag and has a default
+   * of 60 seconds.
+   * <p>
+   * This does not invalidate any previously fetched results.
+   * <p>
+   * Note that an exception thrown by this method should not be taken as indication
+   * that the scan has failed. Subsequent calls to nextRows() might still be successful,
+   * particularly if the scanner is configured to be fault tolerant.
+   * @throws KuduException if anything went wrong.
+   */
+  public final void keepAlive() throws KuduException {
+    KuduClient.joinAndHandleException(asyncScanner.keepAlive());
+  }
+
+  /**
    * Closes this scanner (don't forget to call this when you're done with it!).
    * <p>
    * Closing a scanner already closed has no effect.
    * @return a deferred object that indicates the completion of the request
-   * @throws KuduException if anything went wrong
+   * @throws KuduException if anything went wrong.
    */
   public RowResultIterator close() throws KuduException {
     return KuduClient.joinAndHandleException(asyncScanner.close());

http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 282ec03..80c0843 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -53,20 +53,44 @@ import java.util.concurrent.Future;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
 
+import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
 import org.apache.kudu.util.TimestampUtil;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.rules.TestName;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.util.CapturingLogAppender;
 import org.apache.kudu.util.DecimalUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestKuduClient extends BaseKuduTest {
-  private static final String tableName = "TestKuduClient";
+  private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
+
+  private static final String TABLE_NAME = "TestKuduClient";
+
+  private static final int SHORT_SCANNER_TTL_MS = 5000;
+  private static final int SHORT_SCANNER_GC_US = SHORT_SCANNER_TTL_MS * 100; // 10% of the
TTL.
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Override
+  protected MiniKuduClusterBuilder getMiniClusterBuilder() {
+    MiniKuduClusterBuilder builder = super.getMiniClusterBuilder();
+    // Set a short scanner ttl for some tests.
+    if ("testKeepAlive".equals(testName.getMethodName()) ||
+        "testScannerExpiration".equals(testName.getMethodName())
+    ) {
+      LOG.info("Overriding scanner TTL and GC for testKeepAlive");
+      builder.addTserverFlag(String.format("--scanner_ttl_ms=%d", SHORT_SCANNER_TTL_MS));
+      builder.addTserverFlag(String.format("--scanner_gc_check_interval_us=%d", SHORT_SCANNER_GC_US));
+    }
+    return builder;
+  }
 
   /**
    * Test setting and reading the most recent propagated timestamp.
@@ -74,7 +98,7 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testLastPropagatedTimestamps() throws Exception {
     // Scan a table to ensure a timestamp is propagated.
-    KuduTable table = syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+    KuduTable table = syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
     syncClient.newScannerBuilder(table).build().nextRows().getNumRows();
     assertTrue(syncClient.hasLastPropagatedTimestamp());
     assertTrue(client.hasLastPropagatedTimestamp());
@@ -104,22 +128,22 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testCreateDeleteTable() throws Exception {
     // Check that we can create a table.
-    syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
     assertFalse(syncClient.getTablesList().getTablesList().isEmpty());
-    assertTrue(syncClient.getTablesList().getTablesList().contains(tableName));
+    assertTrue(syncClient.getTablesList().getTablesList().contains(TABLE_NAME));
 
     // Check that we can delete it.
-    syncClient.deleteTable(tableName);
-    assertFalse(syncClient.getTablesList().getTablesList().contains(tableName));
+    syncClient.deleteTable(TABLE_NAME);
+    assertFalse(syncClient.getTablesList().getTablesList().contains(TABLE_NAME));
 
     // Check that we can re-recreate it, with a different schema.
     List<ColumnSchema> columns = new ArrayList<>(basicSchema.getColumns());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("one more", Type.STRING).build());
     Schema newSchema = new Schema(columns);
-    syncClient.createTable(tableName, newSchema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, newSchema, getBasicCreateTableOptions());
 
     // Check that we can open a table and see that it has the new schema.
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     assertEquals(newSchema.getColumnCount(), table.getSchema().getColumnCount());
     assertTrue(table.getPartitionSchema().isSimpleRangePartitioning());
 
@@ -131,7 +155,6 @@ public class TestKuduClient extends BaseKuduTest {
                  newSchema.getColumn("column3_s").getCompressionAlgorithm());
   }
 
-
   /**
    * Test creating a table with various invalid schema cases.
    */
@@ -148,7 +171,7 @@ public class TestKuduClient extends BaseKuduTest {
     }
     Schema schema = new Schema(cols);
     try {
-      syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+      syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
       fail();
     } catch (NonRecoverableException nre) {
       assertThat(nre.toString(), containsString(
@@ -156,6 +179,122 @@ public class TestKuduClient extends BaseKuduTest {
     }
   }
 
+  /*
+   * Test the scanner behavior when a scanner is used beyond
+   * the scanner ttl without calling keepAlive.
+   * Note: The getMiniClusterBuilder override above depends on this method name.
+   */
+  @Test(timeout = 100000)
+  public void testScannerExpiration() throws Exception {
+    // Create a basic table and load it with data.
+    int numRows = 1000;
+    syncClient.createTable(
+        TABLE_NAME,
+        basicSchema,
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+    KuduSession session = syncClient.newSession();
+    KuduTable table = syncClient.openTable(TABLE_NAME);
+
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+
+    KuduScanner scanner = new KuduScanner.KuduScannerBuilder(client, table)
+        .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+        .batchSizeBytes(100) // Use a small batch size so we can call nextRows many times.
+        .build();
+
+    // Initialize the scanner and verify we can read rows.
+    int rows = scanner.nextRows().getNumRows();
+    assertTrue("Scanner did not read any rows", rows > 0);
+
+    // Wait for the scanner to time out.
+    Thread.sleep(SHORT_SCANNER_TTL_MS * 2);
+
+    try {
+      scanner.nextRows();
+      fail("Exception was not thrown when accessing an expired scanner");
+    } catch (NonRecoverableException ex) {
+      assertThat(ex.getMessage(), containsString("Scanner not found"));
+    }
+
+    // Closing an expired scanner shouldn't throw an exception.
+    scanner.close();
+  }
+
+  /*
+   * Test keeping a scanner alive beyond scanner ttl.
+   * Note: The getMiniClusterBuilder override above depends on this method name.
+   */
+  @Test(timeout = 100000)
+  public void testKeepAlive() throws Exception {
+    // Create a basic table and load it with data.
+    int numRows = 1000;
+    syncClient.createTable(
+        TABLE_NAME,
+            basicSchema,
+            new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+    KuduSession session = syncClient.newSession();
+    KuduTable table = syncClient.openTable(TABLE_NAME);
+
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+
+    KuduScanner scanner = new KuduScanner.KuduScannerBuilder(client, table)
+        .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+        .batchSizeBytes(100) // Use a small batch size so we can call nextRows many times.
+        .build();
+
+    // KeepAlive on uninitialized scanner should be ok.
+    scanner.keepAlive();
+    // Get the first batch and initialize the scanner
+    int accum = scanner.nextRows().getNumRows();
+
+    while (scanner.hasMoreRows()) {
+      int rows = scanner.nextRows().getNumRows();
+      accum += rows;
+      // Break when we are between tablets.
+      if (scanner.currentTablet() == null) {
+        LOG.info(String.format("Between tablets after scanning %d rows", accum));
+        break;
+      }
+      // Ensure we actually end up between tablets.
+      if (accum == numRows) {
+        fail("All rows were in a single tablet.");
+      }
+    }
+
+    // In between scanners now and should be ok.
+    scanner.keepAlive();
+
+    // Initialize the next scanner or keepAlive will have no effect.
+    accum += scanner.nextRows().getNumRows();
+
+    // Wait for longer than the scanner ttl calling keepAlive throughout.
+    // Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure
+    // we extend over 2x the scanner ttl.
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(SHORT_SCANNER_TTL_MS / 4);
+      scanner.keepAlive();
+    }
+
+    // Finish out the rows.
+    while (scanner.hasMoreRows()) {
+      accum += scanner.nextRows().getNumRows();
+    }
+    assertEquals("All rows were not scanned", numRows, accum);
+
+    // At this point the scanner is closed and there is nothing to keep alive.
+    try {
+      scanner.keepAlive();
+      fail("Exception was not thrown when calling keepAlive on a closed scanner");
+    } catch (IllegalStateException ex) {
+      assertThat(ex.getMessage(), containsString("Scanner has already been closed"));
+    }
+  }
 
   /**
    * Test creating a table with columns with different combinations of NOT NULL and
@@ -187,9 +326,9 @@ public class TestKuduClient extends BaseKuduTest {
              .defaultValue("def")
              .build());
     Schema schema = new Schema(cols);
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
     KuduSession session = syncClient.newSession();
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
 
     // Insert various rows. '-' indicates leaving the row unset in the insert.
     List<String> rows = ImmutableList.of(
@@ -244,10 +383,10 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testStrings() throws Exception {
     Schema schema = createManyStringsSchema();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     for (int i = 0; i < 100; i++) {
       Insert insert = table.newInsert();
       PartialRow row = insert.getRow();
@@ -298,10 +437,10 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testUTF8() throws Exception {
     Schema schema = createManyStringsSchema();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     Insert insert = table.newInsert();
     PartialRow row = insert.getRow();
     row.addString("key", "กขฃคฅฆง"); // some thai
@@ -325,12 +464,12 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testBinaryColumns() throws Exception {
     Schema schema = createSchemaWithBinaryColumns();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     byte[] testArray = new byte[] {1, 2, 3, 4, 5, 6 ,7, 8, 9};
 
     KuduSession session = syncClient.newSession();
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     for (int i = 0; i < 100; i++) {
       Insert insert = table.newInsert();
       PartialRow row = insert.getRow();
@@ -368,12 +507,12 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testTimestampColumns() throws Exception {
     Schema schema = createSchemaWithTimestampColumns();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     List<Long> timestamps = new ArrayList<>();
 
     KuduSession session = syncClient.newSession();
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     long lastTimestamp = 0;
     for (int i = 0; i < 100; i++) {
       Insert insert = table.newInsert();
@@ -416,10 +555,10 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testDecimalColumns() throws Exception {
     Schema schema = createSchemaWithDecimalColumns();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
 
     // Verify ColumnTypeAttributes
     assertEquals(DecimalUtil.MAX_DECIMAL128_PRECISION,
@@ -455,8 +594,8 @@ public class TestKuduClient extends BaseKuduTest {
   */
   @Test
   public void testScanWithLimit() throws Exception {
-    syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
-    KuduTable table = syncClient.openTable(tableName);
+    syncClient.createTable(TABLE_NAME, basicSchema, getBasicTableOptionsWithNonCoveredRange());
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     KuduSession session = syncClient.newSession();
     int num_rows = 100;
     for (int key = 0; key < num_rows; key++) {
@@ -505,11 +644,11 @@ public class TestKuduClient extends BaseKuduTest {
   @Test
   public void testScanWithPredicates() throws Exception {
     Schema schema = createManyStringsSchema();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     KuduSession session = syncClient.newSession();
     session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     for (int i = 0; i < 100; i++) {
       Insert insert = table.newInsert();
       PartialRow row = insert.getRow();
@@ -628,11 +767,11 @@ public class TestKuduClient extends BaseKuduTest {
    */
   @Test(timeout = 100000)
   public void testScanNonCoveredTable() throws Exception {
-    syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
+    syncClient.createTable(TABLE_NAME, basicSchema, getBasicTableOptionsWithNonCoveredRange());
 
     KuduSession session = syncClient.newSession();
     session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
 
     for (int key = 0; key < 100; key++) {
       session.apply(createBasicSchemaInsert(table, key));
@@ -657,8 +796,8 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testAutoClose() throws Exception {
     try (KuduClient localClient = new KuduClient.KuduClientBuilder(masterAddresses).build())
{
-      localClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
-      KuduTable table = localClient.openTable(tableName);
+      localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+      KuduTable table = localClient.openTable(TABLE_NAME);
       KuduSession session = localClient.newSession();
 
       session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
@@ -666,7 +805,7 @@ public class TestKuduClient extends BaseKuduTest {
       session.apply(insert);
     }
 
-    KuduTable table = syncClient.openTable(tableName);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
     AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, table).build();
     assertEquals(1, countRowsInScan(scanner));
   }
@@ -730,7 +869,7 @@ public class TestKuduClient extends BaseKuduTest {
         .build();
     long buildTime = (System.nanoTime() - startTime) / 1000000000L;
     assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3);
-    localClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+    localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
     Thread[] threads = new Thread[4];
     for (int t = 0; t < 4; t++) {
       final int id = t;
@@ -738,7 +877,7 @@ public class TestKuduClient extends BaseKuduTest {
         @Override
         public void run() {
           try {
-            KuduTable table = localClient.openTable(tableName);
+            KuduTable table = localClient.openTable(TABLE_NAME);
             KuduSession session = localClient.newSession();
             session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
             for (int i = 0; i < 100; i++) {
@@ -761,7 +900,7 @@ public class TestKuduClient extends BaseKuduTest {
 
   @Test(expected=IllegalArgumentException.class)
   public void testNoDefaultPartitioning() throws Exception {
-    syncClient.createTable(tableName, basicSchema, new CreateTableOptions());
+    syncClient.createTable(TABLE_NAME, basicSchema, new CreateTableOptions());
   }
 
   @Test(timeout = 100000)
@@ -773,8 +912,8 @@ public class TestKuduClient extends BaseKuduTest {
     upper.addInt("key", 1);
     options.addRangePartition(lower, upper);
 
-    syncClient.createTable(tableName, basicSchema, options);
-    KuduTable table = syncClient.openTable(tableName);
+    syncClient.createTable(TABLE_NAME, basicSchema, options);
+    KuduTable table = syncClient.openTable(TABLE_NAME);
 
     // Count the number of tablets.
     KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
@@ -791,7 +930,7 @@ public class TestKuduClient extends BaseKuduTest {
       upper = basicSchema.newPartialRow();
       lower.addInt("key", 1);
       alter.addRangePartition(lower, upper);
-      alterClient.alterTable(tableName, alter);
+      alterClient.alterTable(TABLE_NAME, alter);
     }
 
     // Count the number of tablets.  The result should still be the same, since
@@ -801,7 +940,7 @@ public class TestKuduClient extends BaseKuduTest {
     assertEquals(1, tokens.size());
 
     // Reopen the table and count the tablets again. The new tablet should now show up.
-    table = syncClient.openTable(tableName);
+    table = syncClient.openTable(TABLE_NAME);
     tokenBuilder = syncClient.newScanTokenBuilder(table);
     tokens = tokenBuilder.build();
     assertEquals(2, tokens.size());
@@ -810,7 +949,7 @@ public class TestKuduClient extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testCreateTableWithConcurrentInsert() throws Exception {
     KuduTable table = syncClient.createTable(
-        tableName, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false));
+        TABLE_NAME, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false));
 
     // Insert a row.
     //
@@ -827,13 +966,13 @@ public class TestKuduClient extends BaseKuduTest {
 
     // This won't do anything useful (i.e. if the insert succeeds, we know the
     // table has been created), but it's here for additional code coverage.
-    assertTrue(syncClient.isCreateTableDone(tableName));
+    assertTrue(syncClient.isCreateTableDone(TABLE_NAME));
   }
 
   @Test(timeout = 100000)
   public void testCreateTableWithConcurrentAlter() throws Exception {
     // Kick off an asynchronous table creation.
-    Deferred<KuduTable> d = client.createTable(tableName,
+    Deferred<KuduTable> d = client.createTable(TABLE_NAME,
         createManyStringsSchema(), getBasicCreateTableOptions());
 
     // Rename the table that's being created to make sure it doesn't interfere
@@ -843,7 +982,7 @@ public class TestKuduClient extends BaseKuduTest {
     // actually exists.
     while (true) {
       try {
-        syncClient.alterTable(tableName,
+        syncClient.alterTable(TABLE_NAME,
             new AlterTableOptions().renameTable("foo"));
         break;
       } catch (KuduException e) {
@@ -894,7 +1033,7 @@ public class TestKuduClient extends BaseKuduTest {
                               final ReplicaSelection replicaSelection)
           throws Exception {
     Schema schema = createManyStringsSchema();
-    syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+    syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
 
     final int tasksNum = 4;
     List<Callable<Void>> callables = new ArrayList<>();
@@ -906,7 +1045,7 @@ public class TestKuduClient extends BaseKuduTest {
           // in the given flush mode.
           KuduSession session = syncClient.newSession();
           session.setFlushMode(flushMode);
-          KuduTable table = syncClient.openTable(tableName);
+          KuduTable table = syncClient.openTable(TABLE_NAME);
           for (int i = 0; i < 3; i++) {
             for (int j = 100 * i; j < 100 * (i + 1); j++) {
               Insert insert = table.newInsert();

http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index 9ba6d00..c835f27 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -120,6 +120,27 @@ public class TestRemoteTablet {
         tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid());
   }
 
+  // AsyncKuduClient has methods like scanNextRows, keepAlive, and closeScanner that rely
on
+  // RemoteTablet.getReplicaSelectedServerInfo to be deterministic given the same state.
+  // This ensures follow up calls are routed to the same server with the scanner open.
+  // This test ensures that remains true.
+  @Test
+  public void testGetReplicaSelectedServerInfoDeterminism() {
+    RemoteTablet tabletWithLocal = getTablet(0, 0);
+    verifyGetReplicaSelectedServerInfoDeterminism(tabletWithLocal);
+
+    RemoteTablet tabletWithRemote = getTablet(0, -1);
+    verifyGetReplicaSelectedServerInfoDeterminism(tabletWithRemote);
+  }
+
+  private void verifyGetReplicaSelectedServerInfoDeterminism(RemoteTablet tablet) {
+    String init = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid();
+    for (int i = 0; i < 10; i++) {
+      String next = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid();
+      assertEquals("getReplicaSelectedServerInfo was not deterministic", init, next);
+    }
+  }
+
   @Test
   public void testToString() {
     RemoteTablet tablet = getTablet(0, 1);

http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 795aae5..e479aa9 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1953,11 +1953,13 @@ class KUDU_EXPORT KuduScanner {
 
   /// Keep the current remote scanner alive.
   ///
-  /// Keep the current remote scanner alive on the Tablet server
-  /// for an additional time-to-live (set by a configuration flag on
-  /// the tablet server). This is useful if the interval in between
+  /// Keep the current remote scanner alive on the Tablet server for an
+  /// additional time-to-live. This is useful if the interval in between
   /// NextBatch() calls is big enough that the remote scanner might be garbage
-  /// collected (default TTL is set to 60 secs.).
+  /// collected. The scanner time-to-live can be configured on the tablet
+  /// server via the --scanner_ttl_ms configuration flag and has a default
+  /// of 60 seconds.
+  ///
   /// This does not invalidate any previously fetched results.
   ///
   /// @return Operation result status. In particular, this method returns
@@ -1965,7 +1967,8 @@ class KUDU_EXPORT KuduScanner {
   ///   TabletServer was unreachable, for any reason. Note that a non-OK
   ///   status returned by this method should not be taken as indication
   ///   that the scan has failed. Subsequent calls to NextBatch() might
-  ///   still be successful, particularly if SetFaultTolerant() has been called.
+  ///   still be successful, particularly if the scanner is configured to be
+  ///   fault tolerant.
   Status KeepAlive();
 
   /// Close the scanner.


Mime
View raw message