kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [1/4] kudu git commit: KUDU-2261: The order of the responses after flush should match the order we call apply
Date Sat, 20 Jan 2018 01:18:46 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 31a720fc3 -> 6626e109d


KUDU-2261: The order of the responses after flush should match the order we call apply

The response list of flush() should have the same order of we apply operations,
so it's easier to know which operation failed and which succeeded.

Change-Id: Ib37c9e85ad03731bb7d5b83be77d40fcd95e803a
Reviewed-on: http://gerrit.cloudera.org:8080/9029
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b6889d7a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b6889d7a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b6889d7a

Branch: refs/heads/master
Commit: b6889d7aced7e4b6ad40b63fbe2da117294f3803
Parents: 31a720f
Author: zhangzhen <zhangzhen@xiaomi.com>
Authored: Tue Jan 16 18:49:53 2018 +0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri Jan 19 00:24:43 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AsyncKuduSession.java    | 25 ++++++++++++----
 .../main/java/org/apache/kudu/client/Batch.java |  8 +++--
 .../org/apache/kudu/client/BatchResponse.java   | 17 +++++++++--
 .../org/apache/kudu/client/TestKuduSession.java | 31 ++++++++++++++++++++
 4 files changed, 71 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 4ee3ba3..66b58b4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
 import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -342,7 +343,9 @@ public class AsyncKuduSession implements SessionConfiguration {
       // Group the operations by tablet.
       Map<Slice, Batch> batches = new HashMap<>();
       List<OperationResponse> opsFailedInLookup = new ArrayList<>();
+      List<Integer> opsFailedIndexesList = new ArrayList<>();
 
+      int currentIndex = 0;
       for (BufferedOperation bufferedOp : buffer.getOperations()) {
         Operation operation = bufferedOp.getOperation();
         if (bufferedOp.tabletLookupFailed()) {
@@ -366,6 +369,7 @@ public class AsyncKuduSession implements SessionConfiguration {
           }
           operation.callback(response);
           opsFailedInLookup.add(response);
+          opsFailedIndexesList.add(currentIndex++);
           continue;
         }
         LocatedTablet tablet = bufferedOp.getTablet();
@@ -376,12 +380,13 @@ public class AsyncKuduSession implements SessionConfiguration {
           batch = new Batch(operation.getTable(), tablet, ignoreAllDuplicateRows);
           batches.put(tabletId, batch);
         }
-        batch.add(operation);
+        batch.add(operation, currentIndex++);
       }
 
       List<Deferred<BatchResponse>> batchResponses = new ArrayList<>(batches.size()
+ 1);
       if (!opsFailedInLookup.isEmpty()) {
-        batchResponses.add(Deferred.fromResult(new BatchResponse(opsFailedInLookup)));
+        batchResponses.add(
+            Deferred.fromResult(new BatchResponse(opsFailedInLookup, opsFailedIndexesList)));
       }
 
       for (Batch batch : batches.values()) {
@@ -479,12 +484,18 @@ public class AsyncKuduSession implements SessionConfiguration {
         size += batchResponse.getIndividualResponses().size();
       }
 
-      ArrayList<OperationResponse> responses = new ArrayList<>(size);
+      OperationResponse[] responses = new OperationResponse[size];
       for (BatchResponse batchResponse : batchResponses) {
-        responses.addAll(batchResponse.getIndividualResponses());
+        List<OperationResponse> responseList = batchResponse.getIndividualResponses();
+        List<Integer> indexList = batchResponse.getResponseIndexes();
+        for (int i = 0; i < indexList.size(); i++) {
+          int index = indexList.get(i);
+          assert responses[index] == null;
+          responses[index] = responseList.get(i);
+        }
       }
 
-      return responses;
+      return Arrays.asList(responses);
     }
 
     @Override
@@ -690,6 +701,7 @@ public class AsyncKuduSession implements SessionConfiguration {
    */
   private void addBatchCallbacks(final Batch request) {
     final class BatchCallback implements Callback<BatchResponse, BatchResponse> {
+      @Override
       public BatchResponse call(final BatchResponse response) {
         LOG.trace("Got a Batch response for {} rows", request.operations.size());
         if (response.getWriteTimestamp() != 0) {
@@ -746,7 +758,7 @@ public class AsyncKuduSession implements SessionConfiguration {
         // Note that returning an object that's not an exception will make us leave the
         // errback chain. Effectively, the BatchResponse below will end up as part of the
list
         // passed to ConvertBatchToListOfResponsesCB.
-        return handleKuduException ? new BatchResponse(responses) : e;
+        return handleKuduException ? new BatchResponse(responses, request.operationIndexes)
: e;
       }
 
       @Override
@@ -786,6 +798,7 @@ public class AsyncKuduSession implements SessionConfiguration {
    * {@link FlushMode#AUTO_FLUSH_BACKGROUND}.
    */
   private final class FlusherTask implements TimerTask {
+    @Override
     public void run(final Timeout timeout) {
       Buffer buffer = null;
       synchronized (monitor) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index ff49137..f5ebd24 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -43,6 +43,8 @@ class Batch extends KuduRpc<BatchResponse> {
 
   /** Holds batched operations. */
   final List<Operation> operations = new ArrayList<>();
+  /** Holds indexes of operations in the original user's batch. */
+  final List<Integer> operationIndexes = new ArrayList<>();
 
   /** The tablet this batch will be routed to. */
   private final LocatedTablet tablet;
@@ -75,7 +77,7 @@ class Batch extends KuduRpc<BatchResponse> {
     return this.rowOperationsSizeBytes;
   }
 
-  public void add(Operation operation) {
+  public void add(Operation operation, int index) {
     assert Bytes.memcmp(operation.partitionKey(),
                         tablet.getPartition().getPartitionKeyStart()) >= 0 &&
            (tablet.getPartition().getPartitionKeyEnd().length == 0 ||
@@ -83,6 +85,7 @@ class Batch extends KuduRpc<BatchResponse> {
                          tablet.getPartition().getPartitionKeyEnd()) < 0);
 
     operations.add(operation);
+    operationIndexes.add(index);
   }
 
   @Override
@@ -127,7 +130,8 @@ class Batch extends KuduRpc<BatchResponse> {
     }
 
     BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(), tsUUID,
-                                               builder.getTimestamp(), errorsPB, operations);
+                                               builder.getTimestamp(), errorsPB, operations,
+                                               operationIndexes);
 
     if (injectedError != null) {
       if (injectedlatencyMs > 0) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
index d32793f..a426ac4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/BatchResponse.java
@@ -36,6 +36,7 @@ public class BatchResponse extends KuduRpcResponse {
   private final long writeTimestamp;
   private final List<RowError> rowErrors;
   private final List<OperationResponse> individualResponses;
+  private final List<Integer> responsesIndexes;
 
   /**
    * Package-private constructor to be used by the RPCs.
@@ -43,13 +44,15 @@ public class BatchResponse extends KuduRpcResponse {
    * @param writeTimestamp HT's write timestamp
    * @param errorsPB a list of row errors, can be empty
    * @param operations the list of operations which created this response
+   * @param indexes the list of operations' order index
    */
   BatchResponse(long elapsedMillis, String tsUUID, long writeTimestamp,
                 List<Tserver.WriteResponsePB.PerRowErrorPB> errorsPB,
-                List<Operation> operations) {
+                List<Operation> operations, List<Integer> indexes) {
     super(elapsedMillis, tsUUID);
     this.writeTimestamp = writeTimestamp;
     individualResponses = new ArrayList<>(operations.size());
+    this.responsesIndexes = indexes;
     if (errorsPB.isEmpty()) {
       rowErrors = Collections.emptyList();
     } else {
@@ -77,13 +80,15 @@ public class BatchResponse extends KuduRpcResponse {
     }
     assert (rowErrors.size() == errorsPB.size());
     assert (individualResponses.size() == operations.size());
+    assert (individualResponses.size() == responsesIndexes.size());
   }
 
-  BatchResponse(List<OperationResponse> individualResponses) {
+  BatchResponse(List<OperationResponse> individualResponses, List<Integer> indexes)
{
     super(0, null);
     writeTimestamp = 0;
     rowErrors = ImmutableList.of();
     this.individualResponses = individualResponses;
+    this.responsesIndexes = indexes;
   }
 
   /**
@@ -103,4 +108,12 @@ public class BatchResponse extends KuduRpcResponse {
     return individualResponses;
   }
 
+  /**
+   * Package-private method to get the responses' order index.
+   * @return a list of indexes
+   */
+  List<Integer> getResponseIndexes() {
+    return responsesIndexes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b6889d7a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
index 122ed21..582cda3 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduSession.java
@@ -275,6 +275,37 @@ public class TestKuduSession extends BaseKuduTest {
   }
 
   @Test(timeout = 10000)
+  public void testInsertManualFlushResponseOrder() throws Exception {
+    String tableName = name.getMethodName();
+    CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();
+    createOptions.setNumReplicas(1);
+    syncClient.createTable(tableName, basicSchema, createOptions);
+    KuduTable table = syncClient.openTable(tableName);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+
+    // Insert a batch of some valid and some invalid.
+    for (int i = 0; i < 10; i++) {
+      assertNull(session.apply(createBasicSchemaInsert(table, 100 + i * 10)));
+      assertNull(session.apply(createBasicSchemaInsert(table, 200 + i * 10)));
+    }
+    List<OperationResponse> results = session.flush();
+
+    assertEquals(20, results.size());
+
+    for (int i = 0; i < 20; i++) {
+      OperationResponse result = results.get(i);
+      if (i % 2 == 0) {
+        assertTrue(result.hasRowError());
+        assertTrue(result.getRowError().getErrorStatus().isNotFound());
+      } else {
+        assertTrue(!result.hasRowError());
+      }
+    }
+  }
+
+  @Test(timeout = 10000)
   public void testInsertAutoFlushSyncNonCoveredRange() throws Exception {
     String tableName = name.getMethodName();
     CreateTableOptions createOptions = getBasicTableOptionsWithNonCoveredRange();


Mime
View raw message