kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ale...@apache.org
Subject [kudu] 01/04: [java] Improve TestAsyncKuduSession: Eliminate dead setup and other small things
Date Thu, 30 May 2019 05:05:45 GMT
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f07778b341031630e202d281eb268f7b6c23d05e
Author: Will Berkeley <wdberkeley@gmail.com>
AuthorDate: Wed May 29 11:51:09 2019 -0700

    [java] Improve TestAsyncKuduSession: Eliminate dead setup and other small things
    
    This is the first in a series of patches improving TestAsyncKuduSession.
    The main thing this patch does is complete the transition of the test to
    using the KuduTestHarness. Some comments and extra setup and cleanup
    code were unnecessary now that the test uses the harness. I also did
    a few other small things.
    
    Change-Id: I2bc9f32c8e4b8745726e18189c55687246c8bab9
    Reviewed-on: http://gerrit.cloudera.org:8080/13458
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
---
 .../apache/kudu/client/TestAsyncKuduSession.java   | 185 +++++++++------------
 1 file changed, 77 insertions(+), 108 deletions(-)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index cf05fcc..ba2845f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -43,23 +43,10 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.WireProtocol.AppStatusPB;
 import org.apache.kudu.tserver.Tserver.TabletServerErrorPB;
 
-/**
- * This class can either start its own cluster or rely on an existing one.
- * By default it assumes that the master is at localhost:64000.
- * The cluster's configuration flags is found at flagsPath as defined in the pom file.
- * Set startCluster to true in order have the test start the cluster for you.
- * All those properties are set via surefire's systemPropertyVariables, meaning this:
- * $ mvn test -DstartCluster=false
- * will use an existing cluster at default address found above.
- *
- * The test creates a table with a unique(ish) name which it deletes at the end.
- */
 public class TestAsyncKuduSession {
-  // Generate a unique table name
-  private static final String TABLE_NAME =
-      TestAsyncKuduSession.class.getName()+"-"+System.currentTimeMillis();
-
-  private static final Schema schema = getBasicSchema();
+  private static final String TABLE_NAME = TestAsyncKuduSession.class.getName();
+  private static final Schema SCHEMA = getBasicSchema();
+  private static final String INJECTED_TS_ERROR = "injected error for test";
 
   private static AsyncKuduClient client;
   private static KuduTable table;
@@ -70,9 +57,14 @@ public class TestAsyncKuduSession {
   @Before
   public void setUp() throws Exception {
     client = harness.getAsyncClient();
-    table = harness.getClient().createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
+    table = harness.getClient().createTable(TABLE_NAME, SCHEMA, getBasicCreateTableOptions());
   }
 
+  /**
+   * Test that errors in a background flush are surfaced to clients.
+   * TODO(wdberkeley): Improve the method of injecting errors into batches, here and below.
+   * @throws Exception
+   */
   @Test(timeout = 100000)
   public void testBackgroundErrors() throws Exception {
     try {
@@ -85,7 +77,7 @@ public class TestAsyncKuduSession {
       assertTrue(resp.hasRowError());
       assertTrue(
           resp.getRowError().getErrorStatus()
-              .getMessage().contains(getTabletServerErrorMessage()));
+              .getMessage().contains(INJECTED_TS_ERROR));
       assertEquals(1, session.countPendingErrors());
     } finally {
       Batch.injectTabletServerErrorAndLatency(null, 0);
@@ -93,8 +85,8 @@ public class TestAsyncKuduSession {
   }
 
   /**
-   * Regression test for case where an error in the previous batch could cause the next
-   * batch to hang in flush()
+   * Regression test for a case where an error in the previous batch could cause the next
+   * batch to hang in flush().
    */
   @Test(timeout = 100000)
   public void testBatchErrorCauseSessionStuck() throws Exception {
@@ -103,30 +95,30 @@ public class TestAsyncKuduSession {
       session.setFlushMode(AsyncKuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
       session.setFlushInterval(100);
       Batch.injectTabletServerErrorAndLatency(makeTabletServerError(), 200);
-      // 0ms: insert first row, which will be the first batch.
+      // 0ms: Insert the first row, which will be the first batch.
       Deferred<OperationResponse> resp1 = session.apply(createInsert(1));
       Thread.sleep(120);
-      // 100ms: start to send first batch.
-      // 100ms+: first batch got response from ts,
-      //         will wait 200s and throw error.
-      // 120ms: insert another row, which will be the second batch.
+      // 100ms: Start to send the first batch.
+      // 100ms+: The first batch receives a response from the tablet leader, and
+      //         will wait 200s and throw an error.
+      // 120ms: Insert another row, which will be the second batch.
       Deferred<OperationResponse> resp2 = session.apply(createInsert(2));
-      // 220ms: start to send the second batch, but first batch is inflight,
-      //        so add callback to retry after first batch finishes.
-      // 300ms: first batch's callback handles error, retry second batch.
+      // 220ms: Start to send the second batch while the first batch is in flight.
+      // 300ms+: The first batch completes with an error. The second batch is in flight.
       {
         OperationResponse resp = resp1.join(DEFAULT_SLEEP);
         assertTrue(resp.hasRowError());
         assertTrue(
             resp.getRowError().getErrorStatus()
-                .getMessage().contains(getTabletServerErrorMessage()));
+                .getMessage().contains(INJECTED_TS_ERROR));
       }
+      // 300ms++: The second batch completes with an error. It does not remain stuck flushing.
       {
         OperationResponse resp = resp2.join(DEFAULT_SLEEP);
         assertTrue(resp.hasRowError());
         assertTrue(
             resp.getRowError().getErrorStatus()
-                .getMessage().contains(getTabletServerErrorMessage()));
+                .getMessage().contains(INJECTED_TS_ERROR));
       }
       assertFalse(session.hasPendingOperations());
     } finally {
@@ -135,62 +127,54 @@ public class TestAsyncKuduSession {
   }
 
   /**
-   * Regression test for case when tablet lookup error causes original RPC to get stuck.
+   * Regression test for a case when a tablet lookup error causes the original write RPC
to hang.
    * @throws Exception
    */
   @Test(timeout = 100000)
-  public void testGetTableLocationsErrorCauseSessionStuck() throws Exception {
+  public void testGetTableLocationsErrorCausesStuckSession() throws Exception {
     AsyncKuduSession session = client.newSession();
-    // Make sure tablet locations is cached.
+    // Make sure tablet locations are cached.
     Insert insert = createInsert(1);
     session.apply(insert).join(DEFAULT_SLEEP);
     RemoteTablet rt =
         client.getTableLocationEntry(table.getTableId(), insert.partitionKey()).getTablet();
     String tabletId = rt.getTabletId();
     RpcProxy proxy = client.newRpcProxy(rt.getLeaderServerInfo());
-    try {
-      // Delete table so we get table not found error.
-      client.deleteTable(TABLE_NAME).join();
-      // Wait until tablet is deleted on TS.
-      while (true) {
-        ListTabletsRequest req = new ListTabletsRequest(client.getTimer(), 10000);
-        Deferred<ListTabletsResponse> d = req.getDeferred();
-        proxy.sendRpc(req);
-        ListTabletsResponse resp = d.join();
-        if (!resp.getTabletsList().contains(tabletId)) {
-          break;
-        }
-        Thread.sleep(100);
+    // Delete the table so subsequent writes fail with 'table not found'.
+    client.deleteTable(TABLE_NAME).join();
+    // Wait until the tablet is deleted on the TS.
+    while (true) {
+      ListTabletsRequest req = new ListTabletsRequest(client.getTimer(), 10000);
+      Deferred<ListTabletsResponse> d = req.getDeferred();
+      proxy.sendRpc(req);
+      ListTabletsResponse resp = d.join();
+      if (!resp.getTabletsList().contains(tabletId)) {
+        break;
       }
-
-      OperationResponse response = session.apply(createInsert(1)).join(DEFAULT_SLEEP);
-      assertTrue(response.hasRowError());
-      assertTrue(response.getRowError().getErrorStatus().isNotFound());
-    } finally {
-      table = harness.getClient().createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
+      Thread.sleep(100);
     }
+
+    OperationResponse response = session.apply(createInsert(1)).join(DEFAULT_SLEEP);
+    assertTrue(response.hasRowError());
+    assertTrue(response.getRowError().getErrorStatus().isNotFound());
   }
 
   /** Regression test for a failure to correctly handle a timeout when flushing a batch.
*/
   @Test
   public void testInsertIntoUnavailableTablet() throws Exception {
     harness.killAllTabletServers();
-    try {
-      AsyncKuduSession session = client.newSession();
-      session.setTimeoutMillis(1);
-      OperationResponse response = session.apply(createInsert(1)).join();
-      assertTrue(response.hasRowError());
-      assertTrue(response.getRowError().getErrorStatus().isTimedOut());
-
-      session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
-      Insert insert = createInsert(1);
-      session.apply(insert);
-      List<OperationResponse> responses = session.flush().join();
-      assertEquals(1, responses.size());
-      assertTrue(responses.get(0).getRowError().getErrorStatus().isTimedOut());
-    } finally {
-      harness.startAllTabletServers();
-    }
+    AsyncKuduSession session = client.newSession();
+    session.setTimeoutMillis(1);
+    OperationResponse response = session.apply(createInsert(1)).join();
+    assertTrue(response.hasRowError());
+    assertTrue(response.getRowError().getErrorStatus().isTimedOut());
+
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    Insert insert = createInsert(1);
+    session.apply(insert);
+    List<OperationResponse> responses = session.flush().join();
+    assertEquals(1, responses.size());
+    assertTrue(responses.get(0).getRowError().getErrorStatus().isTimedOut());
   }
 
   /**
@@ -206,34 +190,28 @@ public class TestAsyncKuduSession {
     // to before.
     KuduTable nonReplicatedTable = harness.getClient().createTable(
         "non-replicated",
-        schema,
+        SCHEMA,
         getBasicCreateTableOptions().setNumReplicas(1));
 
-    try {
-      // Write before doing any restarts to establish a connection.
-      AsyncKuduSession session = client.newSession();
-      session.setTimeoutMillis(30000);
-      session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
-      session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
-
-      int numClientsBefore = client.getConnectionListCopy().size();
+    // Write before doing any restarts to establish a connection.
+    AsyncKuduSession session = client.newSession();
+    session.setTimeoutMillis(30000);
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+    session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
 
-      // Restart all the tablet servers.
-      harness.killAllTabletServers();
-      harness.startAllTabletServers();
+    int numClientsBefore = client.getConnectionListCopy().size();
 
-      // Perform another write, which will require reconnecting to the same
-      // tablet server that we wrote to above.
-      session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
+    // Restart all the tablet servers.
+    harness.killAllTabletServers();
+    harness.startAllTabletServers();
 
-      // We should not have leaked an entry in the client2tablets map.
-      int numClientsAfter = client.getConnectionListCopy().size();
-      assertEquals(numClientsBefore, numClientsAfter);
-    } finally {
-      harness.startAllTabletServers();
+    // Perform another write, which will require reconnecting to the same
+    // tablet server that we wrote to above.
+    session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
 
-      client.deleteTable("non-replicated").join();
-    }
+    // We should not have leaked an entry in the client2tablets map.
+    int numClientsAfter = client.getConnectionListCopy().size();
+    assertEquals(numClientsBefore, numClientsAfter);
   }
 
   @Test(timeout = 100000)
@@ -460,8 +438,7 @@ public class TestAsyncKuduSession {
     return delete;
   }
 
-  public boolean exists(final int key) throws Exception {
-
+  private boolean exists(final int key) throws Exception {
     AsyncKuduScanner scanner = getScanner(key, key + 1);
     final AtomicBoolean exists = new AtomicBoolean(false);
 
@@ -494,8 +471,7 @@ public class TestAsyncKuduSession {
     return exists.get();
   }
 
-  public int countNullColumns(final int startKey, final int endKey) throws Exception {
-
+  private int countNullColumns(final int startKey, final int endKey) throws Exception {
     AsyncKuduScanner scanner = getScanner(startKey, endKey);
     final AtomicInteger ai = new AtomicInteger();
 
@@ -523,10 +499,8 @@ public class TestAsyncKuduSession {
     return ai.get();
   }
 
-  public int countInRange(final int start, final int exclusiveEnd) throws Exception {
-
-    AsyncKuduScanner scanner = getScanner(start, exclusiveEnd);
-    return countRowsInScan(scanner);
+  private int countInRange(final int start, final int exclusiveEnd) throws Exception {
+    return countRowsInScan(getScanner(start, exclusiveEnd));
   }
 
   private AsyncKuduScanner getScanner(int start, int exclusiveEnd) {
@@ -535,12 +509,11 @@ public class TestAsyncKuduSession {
 
   private AsyncKuduScanner getScanner(int start, int exclusiveEnd,
                                              List<String> columnNames) {
+    PartialRow lowerBound = SCHEMA.newPartialRow();
+    lowerBound.addInt(SCHEMA.getColumnByIndex(0).getName(), start);
 
-    PartialRow lowerBound = schema.newPartialRow();
-    lowerBound.addInt(schema.getColumnByIndex(0).getName(), start);
-
-    PartialRow upperBound = schema.newPartialRow();
-    upperBound.addInt(schema.getColumnByIndex(0).getName(), exclusiveEnd);
+    PartialRow upperBound = SCHEMA.newPartialRow();
+    upperBound.addInt(SCHEMA.getColumnByIndex(0).getName(), exclusiveEnd);
 
     return client.newScannerBuilder(table)
         .lowerBound(lowerBound)
@@ -554,12 +527,8 @@ public class TestAsyncKuduSession {
         .setCode(TabletServerErrorPB.Code.UNKNOWN_ERROR)
         .setStatus(AppStatusPB.newBuilder()
             .setCode(AppStatusPB.ErrorCode.UNKNOWN_ERROR)
-            .setMessage(getTabletServerErrorMessage())
+            .setMessage(INJECTED_TS_ERROR)
             .build())
         .build();
   }
-
-  private String getTabletServerErrorMessage() {
-    return "injected error for test";
-  }
 }


Mime
View raw message