kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [1/3] incubator-kudu git commit: [java client] Add a method to return rows in order
Date Fri, 06 May 2016 21:33:13 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 97669cdfc -> 345fc0cd0


[java client] Add a method to return rows in order

This patch adds an unstable package private API on the scanners to request
the rows to be returned sorted by primary key. This is meant to be used
by more advanced users that understand what they are doing. It will remain
package private until we start internally checking that if the table is
hash partitioned, all hash columns have equality predicates.

The new test changes how ITScannerMultiTablet behaves. It now inserts more
rows and does so randomly, into a hash-partitioned table. That, plus a new
config for TS to flush after 1MB, makes it possible to test this new method.

FWIW the change to 1MB flush applies to all the Java client tests. It could
be made configurable but I haven't found a need for it yet.

Change-Id: I865e28bddd945111ee159b6f2715a8629b75743b
Reviewed-on: http://gerrit.cloudera.org:8080/2951
Reviewed-by: Jean-Daniel Cryans
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/97d48691
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/97d48691
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/97d48691

Branch: refs/heads/master
Commit: 97d48691e74a674e5f5af66be2f8eaf1ca0e29a4
Parents: 97669cd
Author: Jean-Daniel Cryans <jdcryans@apache.org>
Authored: Wed May 4 10:47:48 2016 -0700
Committer: Jean-Daniel Cryans <jdcryans@gerrit.cloudera.org>
Committed: Fri May 6 21:32:26 2016 +0000

----------------------------------------------------------------------
 .../client/AbstractKuduScannerBuilder.java      | 22 ++++++++
 .../org/kududb/client/AsyncKuduScanner.java     | 19 +++++--
 .../java/org/kududb/client/KuduScanner.java     |  2 +-
 .../org/kududb/client/ITScannerMultiTablet.java | 53 ++++++++++++++++----
 .../java/org/kududb/client/MiniKuduCluster.java |  1 +
 5 files changed, 83 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/97d48691/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
b/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
index 0b1e60b..b1b0712 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AbstractKuduScannerBuilder.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
+import org.kududb.Common;
 import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 import org.kududb.tserver.Tserver;
@@ -40,6 +41,7 @@ public abstract class AbstractKuduScannerBuilder
   final Map<String, KuduPredicate> predicates = new HashMap<>();
 
   AsyncKuduScanner.ReadMode readMode = AsyncKuduScanner.ReadMode.READ_LATEST;
+  Common.OrderMode orderMode = Common.OrderMode.UNORDERED;
   int batchSizeBytes = 1024*1024;
   long limit = Long.MAX_VALUE;
   boolean prefetching = false;
@@ -70,6 +72,26 @@ public abstract class AbstractKuduScannerBuilder
   }
 
   /**
+   * Return scan results in primary key sorted order.
+   *
+   * If the table is hash partitioned, the scan must have an equality predicate
+   * on all hashed columns.
+   *
+   * Package private until proper hash partitioning equality predicate checks
+   * are in place.
+   *
+   * Disabled by default.
+   * @return this instance
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  S sortResultsByPrimaryKey() {
+    orderMode = Common.OrderMode.ORDERED;
+    readMode = AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT;
+    return (S) this;
+  }
+
+  /**
    * Adds a predicate for a column.
    * @param predicate predicate for a column to add
    * @return this instance

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/97d48691/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index 4365f15..e0f863d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -122,7 +122,7 @@ public final class AsyncKuduScanner {
     READ_AT_SNAPSHOT(Common.ReadMode.READ_AT_SNAPSHOT);
 
     private Common.ReadMode pbVersion;
-    private ReadMode(Common.ReadMode pbVersion) {
+    ReadMode(Common.ReadMode pbVersion) {
       this.pbVersion = pbVersion;
     }
 
@@ -186,6 +186,8 @@ public final class AsyncKuduScanner {
 
   private final ReadMode readMode;
 
+  private final Common.OrderMode orderMode;
+
   private final long htTimestamp;
 
   /////////////////////
@@ -229,7 +231,8 @@ public final class AsyncKuduScanner {
   private static final AtomicBoolean PARTITION_PRUNE_WARN = new AtomicBoolean(true);
 
   AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
-                   List<Integer> projectedIndexes, ReadMode readMode, long scanRequestTimeout,
+                   List<Integer> projectedIndexes, ReadMode readMode, Common.OrderMode
orderMode,
+                   long scanRequestTimeout,
                    Map<String, KuduPredicate> predicates, long limit,
                    boolean cacheBlocks, boolean prefetching,
                    byte[] startPrimaryKey, byte[] endPrimaryKey,
@@ -245,10 +248,15 @@ public final class AsyncKuduScanner {
       checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "When specifying a " +
           "HybridClock timestamp, the read mode needs to be set to READ_AT_SNAPSHOT");
     }
+    if (orderMode == Common.OrderMode.ORDERED) {
+      checkArgument(readMode == ReadMode.READ_AT_SNAPSHOT, "Returning rows in primary key
order " +
+          "requires the read mode to be set to READ_AT_SNAPSHOT");
+    }
 
     this.client = client;
     this.table = table;
     this.readMode = readMode;
+    this.orderMode = orderMode;
     this.scanRequestTimeout = scanRequestTimeout;
     this.predicates = predicates;
     this.limit = limit;
@@ -383,6 +391,10 @@ public final class AsyncKuduScanner {
     return this.readMode;
   }
 
+  private Common.OrderMode getOrderMode() {
+    return this.orderMode;
+  }
+
   /**
    * Returns the projection schema of this scanner. If specific columns were
    * not specified during scanner creation, the table schema is returned.
@@ -713,6 +725,7 @@ public final class AsyncKuduScanner {
           newBuilder.addAllProjectedColumns(ProtobufHelper.schemaToListPb(schema));
           newBuilder.setTabletId(ZeroCopyLiteralByteString.wrap(tablet.getTabletIdAsBytes()));
           newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
+          newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());
           newBuilder.setCacheBlocks(cacheBlocks);
           // if the last propagated timestamp is set send it with the scan
           if (table.getAsyncClient().getLastPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP)
{
@@ -829,7 +842,7 @@ public final class AsyncKuduScanner {
      */
     public AsyncKuduScanner build() {
       return new AsyncKuduScanner(
-          client, table, projectedColumnNames, projectedColumnIndexes, readMode,
+          client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
           lowerBoundPartitionKey, upperBoundPartitionKey,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/97d48691/java/kudu-client/src/main/java/org/kududb/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/KuduScanner.java
index 4756279..d2d2e8e 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduScanner.java
@@ -129,7 +129,7 @@ public class KuduScanner {
      */
     public KuduScanner build() {
       return new KuduScanner(new AsyncKuduScanner(
-          client, table, projectedColumnNames, projectedColumnIndexes, readMode,
+          client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
           lowerBoundPartitionKey, upperBoundPartitionKey,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/97d48691/java/kudu-client/src/test/java/org/kududb/client/ITScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/kududb/client/ITScannerMultiTablet.java
index a79217d..13465f9 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/ITScannerMultiTablet.java
@@ -16,11 +16,15 @@
 // under the License.
 package org.kududb.client;
 
+import com.google.common.collect.Lists;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.kududb.Schema;
 
-import static org.junit.Assert.*;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration test that inserts enough data to trigger flushes and getting multiple data
@@ -30,24 +34,23 @@ public class ITScannerMultiTablet extends BaseKuduTest {
 
   private static final String TABLE_NAME =
       ITScannerMultiTablet.class.getName()+"-"+System.currentTimeMillis();
-  private static final int ROW_COUNT = 3000;
+  private static final int ROW_COUNT = 20000;
   private static final int TABLET_COUNT = 3;
 
   private static Schema schema = getBasicSchema();
   private static KuduTable table;
 
+  private static Random random = new Random(1234);
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     BaseKuduTest.setUpBeforeClass();
 
     CreateTableOptions builder = new CreateTableOptions();
 
-    int step = ROW_COUNT / TABLET_COUNT;
-    for (int i = step; i < ROW_COUNT; i += step){
-      PartialRow splitRow = getBasicSchema().newPartialRow();
-      splitRow.addInt(0, i);
-      builder.addSplitRow(splitRow);
-    }
+    builder.addHashPartitions(
+        Lists.newArrayList(schema.getColumnByIndex(0).getName()),
+        TABLET_COUNT);
 
     table = createTable(TABLE_NAME, schema, builder);
 
@@ -59,7 +62,7 @@ public class ITScannerMultiTablet extends BaseKuduTest {
     for (int i = 0; i < ROW_COUNT; i++) {
       Insert insert = table.newInsert();
       PartialRow row = insert.getRow();
-      row.addInt(0, i);
+      row.addInt(0, random.nextInt());
       row.addInt(1, i);
       row.addInt(2, i);
       row.addString(3, new String(chars));
@@ -74,7 +77,7 @@ public class ITScannerMultiTablet extends BaseKuduTest {
    * Test for KUDU-1343 with a multi-batch multi-tablet scan.
    */
   @Test(timeout = 100000)
-  public void test() throws Exception {
+  public void testKudu1343() throws Exception {
     KuduScanner scanner = syncClient.newScannerBuilder(table)
         .batchSizeBytes(1) // Just a hint, won't actually be that small
         .build();
@@ -93,4 +96,34 @@ public class ITScannerMultiTablet extends BaseKuduTest {
     assertTrue(loopCount > TABLET_COUNT);
     assertEquals(ROW_COUNT, rowCount);
   }
+
+  /**
+   * Makes sure we pass all the correct information down to the server by verifying we get
rows in
+   * order from 4 tablets. We detect those tablet boundaries when keys suddenly become smaller
than
+   * what was previously seen.
+   */
+  @Test(timeout = 100000)
+  public void testSortResultsByPrimaryKey() throws Exception {
+    KuduScanner scanner = syncClient.newScannerBuilder(table)
+        .sortResultsByPrimaryKey()
+        .setProjectedColumnIndexes(Lists.newArrayList(0))
+        .build();
+
+    int rowCount = 0;
+    int previousRow = -1;
+    int tableBoundariesCount = 0;
+    while(scanner.hasMoreRows()) {
+      RowResultIterator rri = scanner.nextRows();
+      while (rri.hasNext()) {
+        int key = rri.next().getInt(0);
+        if (key < previousRow) {
+          tableBoundariesCount++;
+        }
+        previousRow = key;
+        rowCount++;
+      }
+    }
+    assertEquals(ROW_COUNT, rowCount);
+    assertEquals(TABLET_COUNT, tableBoundariesCount);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/97d48691/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
index 3135b3f..448ebbd 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
@@ -122,6 +122,7 @@ public class MiniKuduCluster implements AutoCloseable {
           "--flagfile=" + flagsPath,
           "--fs_wal_dir=" + dataDirPath,
           "--fs_data_dirs=" + dataDirPath,
+          "--flush_threshold_mb=1",
           "--tserver_master_addrs=" + masterAddresses,
           "--webserver_interface=" + localhost,
           "--local_ip_for_outbound_sockets=" + localhost,


Mime
View raw message