kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] branch master updated: [backup] Support partition alterations between Kudu backups
Date Thu, 02 May 2019 13:35:18 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 5312dd8  [backup] Support partition alterations between Kudu backups
5312dd8 is described below

commit 5312dd8fb058066c93c7b0c81e050dc4a28dc10c
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Mon Apr 29 17:04:50 2019 -0500

    [backup] Support partition alterations between Kudu backups
    
    This patch enables partitions to be added and dropped
    between backups.
    
    It does this by adding tablet and partition details to the
    metadata which can be used to detect if a partition
    was dropped. It filters out partitions that are no longer
    valid and adds the remaining partitions to a
    KuduPartitioner. That KuduPartitioner can then be used
    to filter out non-covered rows.
    
    This does require some previously package/private
    classes and constructors to be marked as public.
    In those cases I have tagged them as Private and Unstable
    using the interface annotations.
    
    Change-Id: I31e0eb27f163c38840e5466ff85d0b4a44d4ec0a
    Reviewed-on: http://gerrit.cloudera.org:8080/13191
    Reviewed-by: Grant Henke <granthenke@apache.org>
    Tested-by: Grant Henke <granthenke@apache.org>
---
 java/kudu-backup/src/main/protobuf/backup.proto    |  19 +++-
 .../scala/org/apache/kudu/backup/KuduRestore.scala |  86 +++++++++++++-----
 .../org/apache/kudu/backup/TableMetadata.scala     |  42 +++++++--
 .../org/apache/kudu/backup/TestKuduBackup.scala    |  62 +++++++++++++
 .../org/apache/kudu/client/KuduPartitioner.java    | 100 +++++++++++++++------
 .../kudu/client/NonCoveredRangeException.java      |   2 +-
 .../java/org/apache/kudu/client/Partition.java     |   2 +-
 .../org/apache/kudu/client/PartitionSchema.java    |   6 +-
 8 files changed, 256 insertions(+), 63 deletions(-)

diff --git a/java/kudu-backup/src/main/protobuf/backup.proto b/java/kudu-backup/src/main/protobuf/backup.proto
index 5971e02..5711e46 100644
--- a/java/kudu-backup/src/main/protobuf/backup.proto
+++ b/java/kudu-backup/src/main/protobuf/backup.proto
@@ -90,11 +90,19 @@ message HashPartitionMetadataPB {
 
 // Maps to PartitionSchema class.
 // The fields are effectively 1 to 1 mappings of those in PartitionSchema.
-message PartitionMetadataPB {
+message PartitionSchemaMetadataPB {
   repeated HashPartitionMetadataPB hash_partitions = 1;
   RangePartitionMetadataPB range_partitions = 2;
 }
 
+// Maps to Partition class.
+// The fields are effectively 1 to 1 mappings of those in Partition.
+message PartitionMetadataPB {
+  bytes partition_key_start = 1;
+  bytes partition_key_end = 2;
+  repeated int32 hash_buckets = 3;
+}
+
 // Represents the metadata of a table backup. This metadata is output
 // so we can understand and create a table that matches the backed up
 // table on restore.
@@ -118,8 +126,13 @@ message TableMetadataPB {
   // The metadata for the table's columns.
   repeated ColumnMetadataPB columns = 8;
   // A map of column name to internal column id.
+  // This is validation only and not used when creating the restored table.
   // This is useful for detecting dropped and added columns.
   map<string, int32> column_ids = 9;
-  // The metadata for the table's partitions.
-  PartitionMetadataPB partitions = 10;
+  // The metadata for the table's partition schema.
+  PartitionSchemaMetadataPB partitions = 10;
+  // A map of tablet ID to the partition start key.
+  // This is validation only and not used when creating the restored table.
+  // This is useful for detecting dropped and added partitions.
+  map<string, PartitionMetadataPB> tablets = 11;
 }
\ No newline at end of file
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index bf6103c..d5fe12c 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -18,6 +18,9 @@ package org.apache.kudu.backup
 
 import org.apache.kudu.backup.Backup.TableMetadataPB
 import org.apache.kudu.client.AlterTableOptions
+import org.apache.kudu.client.KuduPartitioner
+import org.apache.kudu.client.NonCoveredRangeException
+import org.apache.kudu.client.Partition
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.spark.kudu.KuduContext
 import org.apache.kudu.spark.kudu.RowConverter
@@ -54,8 +57,9 @@ object KuduRestore {
       val lastMetadata = graph.restorePath.backups.last.metadata
       graph.restorePath.backups.foreach { backup =>
         log.info(s"Restoring table $tableName from path: ${backup.path}")
-        val isFullRestore = backup.metadata.getFromMs == 0
-        val restoreName = s"${backup.metadata.getTableName}${options.tableSuffix}"
+        val metadata = backup.metadata
+        val isFullRestore = metadata.getFromMs == 0
+        val restoreName = s"${metadata.getTableName}${options.tableSuffix}"
 
         // TODO (KUDU-2788): Store the full metadata to compare/validate for each applied
partial.
 
@@ -68,13 +72,12 @@ object KuduRestore {
             createTableRangePartitionByRangePartition(restoreName, lastMetadata, context)
           }
         }
-
-        val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(backup.metadata))
+        val backupSchema = io.dataSchema(TableMetadata.getKuduSchema(metadata))
         val rowActionCol = backupSchema.fields.last.name
         val table = context.syncClient.openTable(restoreName)
 
         var data = session.sqlContext.read
-          .format(backup.metadata.getDataFormat)
+          .format(metadata.getDataFormat)
           .schema(backupSchema)
           .load(backup.path.toString)
           // Default the the row action column with a value of "UPSERT" so that the
@@ -83,32 +86,36 @@ object KuduRestore {
           .fill(RowAction.UPSERT.getValue, Seq(rowActionCol))
 
         // Adjust for dropped and renamed columns.
-        data = adjustSchema(data, backup.metadata, lastMetadata, rowActionCol)
+        data = adjustSchema(data, metadata, lastMetadata, rowActionCol)
         val restoreSchema = data.schema
 
         // Write the data to Kudu.
         data.queryExecution.toRdd.foreachPartition { internalRows =>
           val table = context.syncClient.openTable(restoreName)
           val converter = new RowConverter(table.getSchema, restoreSchema, false)
+          val partitioner = createPartitionFilter(metadata, lastMetadata)
           val session = context.syncClient.newSession
           session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
-          try {
-            for (internalRow <- internalRows) {
-              // Convert the InternalRows to Rows.
-              // This avoids any corruption as reported in SPARK-26880.
-              val row = converter.toRow(internalRow)
-              // Get the operation type based on the row action column.
-              // This will always be the last column in the row.
-              val rowActionValue = row.getByte(row.length - 1)
-              val rowAction = RowAction.fromValue(rowActionValue)
-              // Generate an operation based on the row action.
-              val operation = rowAction match {
-                case RowAction.UPSERT => table.newUpsert()
-                case RowAction.DELETE => table.newDelete()
-                case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
-              }
-              // Convert the Spark row to a partial row and set it on the operation.
-              val partialRow = converter.toPartialRow(row)
+          try for (internalRow <- internalRows) {
+            // Convert the InternalRows to Rows.
+            // This avoids any corruption as reported in SPARK-26880.
+            val row = converter.toRow(internalRow)
+            // Get the operation type based on the row action column.
+            // This will always be the last column in the row.
+            val rowActionValue = row.getByte(row.length - 1)
+            val rowAction = RowAction.fromValue(rowActionValue)
+            // Generate an operation based on the row action.
+            val operation = rowAction match {
+              case RowAction.UPSERT => table.newUpsert()
+              case RowAction.DELETE => table.newDelete()
+              case _ => throw new IllegalStateException(s"Unsupported RowAction: $rowAction")
+            }
+            // Convert the Spark row to a partial row and set it on the operation.
+            val partialRow = converter.toPartialRow(row)
+            // Drop rows that are not covered by the partitioner. This is how we
+            // detect a partition which was dropped between backups and filter
+            // out the rows from that dropped partition.
+            if (partitioner.isCovered(partialRow)) {
               operation.setRow(partialRow)
               session.apply(operation)
             }
@@ -191,6 +198,39 @@ object KuduRestore {
     result
   }
 
+  /**
+   * Creates a KuduPartitioner that can be used to filter out rows for the current
+   * backup data which no longer apply to partitions in the last metadata.
+   *
+   * In order to do this, tablet metadata are compared in the current metadata to the
+   * last metadata. Tablet IDs that are not in the final metadata are filtered out and
+   * the remaining tablet metadata is used to create a KuduPartitioner. The resulting
+   * KuduPartitioner can then be used to filter out rows that are no longer valid
+   * because those rows will fall into a non-covered range.
+   */
+  private def createPartitionFilter(
+      currentMetadata: TableMetadataPB,
+      lastMetadata: TableMetadataPB): KuduPartitioner = {
+    val lastTablets = lastMetadata.getTabletsMap
+    val validTablets =
+      currentMetadata.getTabletsMap.asScala.flatMap {
+        case (id, pm) =>
+          if (lastTablets.containsKey(id)) {
+            // Create the partition object needed for the KuduPartitioner.
+            val partition = new Partition(
+              pm.getPartitionKeyStart.toByteArray,
+              pm.getPartitionKeyEnd.toByteArray,
+              pm.getHashBucketsList)
+            Some((id, partition))
+          } else {
+            // Ignore tablets that are no longer valid
+            None
+          }
+      }
+    val partitionSchema = TableMetadata.getPartitionSchema(currentMetadata)
+    new KuduPartitioner(partitionSchema, validTablets.asJava)
+  }
+
   def main(args: Array[String]): Unit = {
     val options = RestoreOptions
       .parse(args)
diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index f09e3d2..cfd3933 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -19,6 +19,7 @@ package org.apache.kudu.backup
 import java.math.BigDecimal
 import java.util
 
+import com.google.protobuf.ByteString
 import com.google.protobuf.StringValue
 import org.apache.commons.net.util.Base64
 import org.apache.kudu.backup.Backup._
@@ -29,14 +30,17 @@ import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
 import org.apache.kudu.client.CreateTableOptions
 import org.apache.kudu.client.KuduTable
 import org.apache.kudu.client.PartialRow
+import org.apache.kudu.client.PartitionSchema
 import org.apache.kudu.ColumnSchema
 import org.apache.kudu.Schema
 import org.apache.kudu.Type
+import org.apache.kudu.client.KuduPartitioner.KuduPartitionerBuilder
+import org.apache.kudu.client.PartitionSchema.HashBucketSchema
+import org.apache.kudu.client.PartitionSchema.RangeSchema
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -68,6 +72,18 @@ object TableMetadata {
       builder.build()
     }
 
+    val partitioner = new KuduPartitionerBuilder(table).build()
+    val tablets = partitioner.getTabletMap.asScala.map {
+      case (id, partition) =>
+        val metadata = PartitionMetadataPB
+          .newBuilder()
+          .setPartitionKeyStart(ByteString.copyFrom(partition.getPartitionKeyStart))
+          .setPartitionKeyEnd(ByteString.copyFrom(partition.getPartitionKeyStart))
+          .addAllHashBuckets(partition.getHashBuckets)
+          .build()
+        (id, metadata)
+    }
+
     TableMetadataPB
       .newBuilder()
       .setVersion(MetadataVersion)
@@ -79,7 +95,8 @@ object TableMetadata {
       .addAllColumns(columns.asJava)
       .putAllColumnIds(columnIds)
       .setNumReplicas(table.getNumReplicas)
-      .setPartitions(getPartitionMetadata(table))
+      .setPartitions(getPartitionSchemaMetadata(table))
+      .putAllTablets(tablets.asJava)
       .build()
   }
 
@@ -92,10 +109,10 @@ object TableMetadata {
       .build()
   }
 
-  private def getPartitionMetadata(table: KuduTable): PartitionMetadataPB = {
+  private def getPartitionSchemaMetadata(table: KuduTable): PartitionSchemaMetadataPB = {
     val hashPartitions = getHashPartitionsMetadata(table)
     val rangePartitions = getRangePartitionMetadata(table)
-    PartitionMetadataPB
+    PartitionSchemaMetadataPB
       .newBuilder()
       .addAllHashPartitions(hashPartitions.asJava)
       .setRangePartitions(rangePartitions)
@@ -199,7 +216,9 @@ object TableMetadata {
       }
       builder.build()
     }
-    new Schema(columns.asJava)
+    val toId = metadata.getColumnIdsMap.asScala
+    val colIds = metadata.getColumnsList.asScala.map(_.getName).map(toId)
+    new Schema(columns.asJava, colIds.asJava)
   }
 
   private def getValue(row: PartialRow, columnName: String, colType: Type): Any = {
@@ -307,4 +326,17 @@ object TableMetadata {
       (lower, upper)
     }
   }
+
+  def getPartitionSchema(metadata: TableMetadataPB): PartitionSchema = {
+    val colNameToId = metadata.getColumnIdsMap.asScala
+    val schema = getKuduSchema(metadata)
+    val rangeIds =
+      metadata.getPartitions.getRangePartitions.getColumnNamesList.asScala.map(colNameToId)
+    val rangeSchema = new RangeSchema(rangeIds.asJava)
+    val hashSchemas = metadata.getPartitions.getHashPartitionsList.asScala.map { hp =>
+      val colIds = hp.getColumnNamesList.asScala.map(colNameToId)
+      new HashBucketSchema(colIds.asJava, hp.getNumBuckets, hp.getSeed)
+    }
+    new PartitionSchema(rangeSchema, hashSchemas.asJava, schema)
+  }
 }
diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index dcc813a..f253743 100644
--- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -349,6 +349,68 @@ class TestKuduBackup extends KuduTestSuite {
     assertTrue(rows.forall(_.getString("col_a") == "default"))
   }
 
+  @Test
+  def testPartitionAlterHandling(): Unit = {
+    // Create a basic table with 10 row range partitions covering 10 through 40.
+    val tableName = "testColumnAlterHandling"
+    val ten = createPartitionRow(10)
+    val twenty = createPartitionRow(20)
+    val thirty = createPartitionRow(30)
+    val fourty = createPartitionRow(40)
+    val options = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .addRangePartition(ten, twenty)
+      .addRangePartition(twenty, thirty)
+      .addRangePartition(thirty, fourty)
+    var table = kuduClient.createTable(tableName, schema, options)
+
+    // Fill the partitions with rows.
+    insertRows(table, 30, 10)
+
+    // Run a full backup on the table.
+    backupAndValidateTable(tableName, 30, false)
+
+    // Drop partition 10-20, drop and re-add partition 20-30, add partition 0-10 and 40-50.
+    // (drops 20 total rows)
+    val zero = createPartitionRow(0)
+    val fifty = createPartitionRow(50)
+    kuduClient.alterTable(
+      tableName,
+      new AlterTableOptions()
+        .dropRangePartition(ten, twenty)
+        .dropRangePartition(twenty, thirty)
+        .addRangePartition(twenty, thirty)
+        .addRangePartition(zero, ten)
+        .addRangePartition(fourty, fifty)
+    )
+
+    // Add some rows back to the new partitions (adds 15 total rows)
+    insertRows(table, 5, 0)
+    insertRows(table, 5, 20)
+    insertRows(table, 5, 40)
+
+    // Run an incremental backup on the table.
+    backupAndValidateTable(tableName, 15, true)
+
+    // Restore the table and validate.
+    doRestore(createRestoreOptions(Seq(tableName)))
+
+    val restoreTable = kuduClient.openTable(s"$tableName-restore")
+    val scanner = kuduClient.newScannerBuilder(restoreTable).build()
+    val rows = scanner.asScala.toList.map(_.getInt("key")).sorted
+    val expectedKeys =
+      (Range(0, 5) ++ Range(20, 25) ++ Range(30, 40) ++ Range(40, 45)).toList.sorted
+
+    assertEquals(25, rows.length)
+    assertEquals(expectedKeys, rows)
+  }
+
+  def createPartitionRow(value: Int): PartialRow = {
+    val row = schema.newPartialRow()
+    row.addInt("key", value)
+    row
+  }
+
   def createRandomTable(): KuduTable = {
     val columnCount = random.nextInt(50) + 1 // At least one column.
     val keyColumnCount = random.nextInt(columnCount) + 1 // At least one key.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java
index 9dba46c..1649586 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPartitioner.java
@@ -21,10 +21,13 @@ import com.google.common.base.Preconditions;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 /**
  * A KuduPartitioner allows clients to determine the target partition of a
  * row without actually performing a write. The set of partitions is eagerly
@@ -42,15 +45,35 @@ public class KuduPartitioner {
   private static final int NON_COVERED_RANGE_INDEX = -1;
 
   private final PartitionSchema partitionSchema;
+  private final Map<String, Partition> tabletIdToPartition;
   private final NavigableMap<BytesKey, Integer> partitionByStartKey;
   private final int numPartitions;
 
-  KuduPartitioner(PartitionSchema partitionSchema,
-                  NavigableMap<BytesKey, Integer> partitionByStartKey,
-                  int numPartitions) {
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public KuduPartitioner(PartitionSchema partitionSchema,
+                        Map<String, Partition> tabletIdToPartition) {
+    // TODO(ghenke): Could also build a map of partition index to tablet ID which would
+    //  be useful for identifying which tablet a given row would come from.
+    NavigableMap<BytesKey, Integer> partitionByStartKey = new TreeMap<>();
+    // Insert a sentinel for the beginning of the table, in case a user
+    // queries for any row which falls before the first partition.
+    partitionByStartKey.put(EMPTY, NON_COVERED_RANGE_INDEX);
+    int index = 0;
+    for (Map.Entry<String, Partition> entry : tabletIdToPartition.entrySet()) {
+      BytesKey keyStart = new BytesKey(entry.getValue().partitionKeyStart);
+      BytesKey keyEnd = new BytesKey(entry.getValue().partitionKeyEnd);
+      partitionByStartKey.put(keyStart, index++);
+      // Set the start of the next non-covered range to have the NON_COVERED_RANGE_INDEX.
+      // As we process partitions, if a partition covers this range, the keyStart will be
+      // equal to this keyEnd and the NON_COVERED_RANGE_INDEX will be replaced with the index
+      // of that partition.
+      partitionByStartKey.putIfAbsent(keyEnd, NON_COVERED_RANGE_INDEX);
+    }
     this.partitionSchema = partitionSchema;
+    this.tabletIdToPartition = tabletIdToPartition;
     this.partitionByStartKey = partitionByStartKey;
-    this.numPartitions = numPartitions;
+    this.numPartitions = tabletIdToPartition.size();
   }
 
   /**
@@ -61,6 +84,22 @@ public class KuduPartitioner {
   }
 
   /**
+   * Determine if the given row falls into a valid partition.
+   *
+   * NOTE: The row must be constructed with a schema returned from the Kudu server.
+   * ex: `KuduTable.getSchema().newPartialRow();`
+   *
+   * @param row The row to check.
+   * @return true if the row falls into a valid partition.
+   */
+  public boolean isCovered(PartialRow row) {
+    BytesKey partitionKey = new BytesKey(encodePartitionKey(row));
+    // The greatest key that is less than or equal to the given key.
+    Map.Entry<BytesKey, Integer> floor = partitionByStartKey.floorEntry(partitionKey);
+    return floor.getValue() != NON_COVERED_RANGE_INDEX;
+  }
+
+  /**
    * Determine the partition index that the given row falls into.
    *
    * NOTE: The row must be constructed with a schema returned from the Kudu server.
@@ -72,11 +111,7 @@ public class KuduPartitioner {
    * @throws NonCoveredRangeException if the row falls into a non-covered range.
    */
   public int partitionRow(PartialRow row) throws NonCoveredRangeException {
-    // Column IDs are required to encode the partition key.
-    Preconditions.checkArgument(row.getSchema().hasColumnIds(),
-        "The row must be constructed with a schema returned from the server. " +
-            "(ex: KuduTable.getSchema().newPartialRow();");
-    BytesKey partitionKey = new BytesKey(partitionSchema.encodePartitionKey(row));
+    BytesKey partitionKey = new BytesKey(encodePartitionKey(row));
     // The greatest key that is less than or equal to the given key.
     Map.Entry<BytesKey, Integer> floor = partitionByStartKey.floorEntry(partitionKey);
     if (floor.getValue() == NON_COVERED_RANGE_INDEX) {
@@ -86,6 +121,23 @@ public class KuduPartitioner {
     return floor.getValue();
   }
 
+  private byte[] encodePartitionKey(PartialRow row) {
+    // Column IDs are required to encode the partition key.
+    Preconditions.checkArgument(row.getSchema().hasColumnIds(),
+        "The row must be constructed with a schema returned from the server. " +
+            "(ex: KuduTable.getSchema().newPartialRow();");
+    return partitionSchema.encodePartitionKey(row);
+  }
+
+  /**
+   * @return the internal map of tablet ID to Partition.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public Map<String, Partition> getTabletMap() {
+    return tabletIdToPartition;
+  }
+
   /**
    * A wrapper around a byte array that implements the Comparable interface
    * allowing it to be used as the key in map.
@@ -145,36 +197,30 @@ public class KuduPartitioner {
     public KuduPartitioner build() throws KuduException {
       final TimeoutTracker timeoutTracker = new TimeoutTracker();
       timeoutTracker.setTimeout(timeoutMillis);
-      NavigableMap<BytesKey, Integer> partitionByStartKey = new TreeMap<>();
-      // Insert a sentinel for the beginning of the table, in case a user
-      // queries for any row which falls before the first partition.
-      partitionByStartKey.put(EMPTY, NON_COVERED_RANGE_INDEX);
-      BytesKey nextPartKey = EMPTY;
-      int numPartitions = 0;
+      // Use a LinkedHashMap to maintain partition order.
+      // This isn't strictly required, but it means that partitions with lower ranges
+      // will have lower partition index since this map is processed in a for
+      // loop when constructing the KuduPartitioner.
+      LinkedHashMap<String, Partition> tabletIdToPartition = new LinkedHashMap<>();
+      byte[] nextPartKey = EMPTY.bytes;
       while (true) {
         LocatedTablet tablet;
         try {
           tablet = KuduClient.joinAndHandleException(
               table.getAsyncClient().getTabletLocation(table,
-                  nextPartKey.bytes, AsyncKuduClient.LookupType.LOWER_BOUND,
+                  nextPartKey, AsyncKuduClient.LookupType.LOWER_BOUND,
                   timeoutTracker.getMillisBeforeTimeout()));
         } catch (NonCoveredRangeException ncr) {
           // No more tablets
           break;
         }
-        BytesKey keyStart = new BytesKey(tablet.getPartition().partitionKeyStart);
-        BytesKey keyEnd = new BytesKey(tablet.getPartition().partitionKeyEnd);
-        partitionByStartKey.put(keyStart, numPartitions++);
-        if (keyEnd.isEmpty()) break;
-        // Set the start of the next non-covered range to have the NON_COVERED_RANGE_INDEX.
-        // As we process partitions, if a partition covers this range, the keyStart will
be
-        // equal to this keyEnd and the NON_COVERED_RANGE_INDEX will be replaced with the
index
-        // of that partition.
-        partitionByStartKey.put(keyEnd, NON_COVERED_RANGE_INDEX);
+        String tabletId = new String(tablet.getTabletId(), UTF_8);
+        tabletIdToPartition.put(tabletId, tablet.getPartition());
+        byte[] keyEnd = tablet.getPartition().partitionKeyEnd;
+        if (keyEnd.length == 0) break;
         nextPartKey = keyEnd;
       }
-      return new KuduPartitioner(table.getPartitionSchema(), partitionByStartKey, numPartitions);
+      return new KuduPartitioner(table.getPartitionSchema(), tabletIdToPartition);
     }
   }
-
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeException.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeException.java
index 1c3f908..6f3efe2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeException.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeException.java
@@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Exception indicating that an operation attempted to access a non-covered range partition.
  */
 @InterfaceAudience.Private
-class NonCoveredRangeException extends NonRecoverableException {
+public class NonCoveredRangeException extends NonRecoverableException {
   private final byte[] nonCoveredRangeStart;
   private final byte[] nonCoveredRangeEnd;
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Partition.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Partition.java
index 9ad6a2d..8c830e0 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Partition.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Partition.java
@@ -63,7 +63,7 @@ public class Partition implements Comparable<Partition> {
    * @param partitionKeyEnd the end partition key
    * @param hashBuckets the partition hash buckets
    */
-  Partition(byte[] partitionKeyStart,
+  public Partition(byte[] partitionKeyStart,
             byte[] partitionKeyEnd,
             List<Integer> hashBuckets) {
     this.partitionKeyStart = partitionKeyStart;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
index 502eea6..e340943 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartitionSchema.java
@@ -56,7 +56,7 @@ public class PartitionSchema {
    * @param hashBucketSchemas the hash bucket schemas
    * @param schema the table schema
    */
-  PartitionSchema(RangeSchema rangeSchema,
+  public PartitionSchema(RangeSchema rangeSchema,
                   List<HashBucketSchema> hashBucketSchemas,
                   Schema schema) {
     this.rangeSchema = rangeSchema;
@@ -105,7 +105,7 @@ public class PartitionSchema {
   public static class RangeSchema {
     private final List<Integer> columns;
 
-    RangeSchema(List<Integer> columns) {
+    public RangeSchema(List<Integer> columns) {
       this.columns = columns;
     }
 
@@ -133,7 +133,7 @@ public class PartitionSchema {
     private int numBuckets;
     private int seed;
 
-    HashBucketSchema(List<Integer> columnIds, int numBuckets, int seed) {
+    public HashBucketSchema(List<Integer> columnIds, int numBuckets, int seed) {
       this.columnIds = columnIds;
       this.numBuckets = numBuckets;
       this.seed = seed;


Mime
View raw message