kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject [2/2] kudu git commit: [java client] Support add/remove partition
Date Wed, 10 Aug 2016 02:53:13 GMT
[java client] Support add/remove partition

This also sneaks a fix into catalog manager to change the status type when
rejecting invalid alter table partitioning operations.

Change-Id: If5da5f0d3e677e62256a8a6d2107093bbda44cde
Reviewed-on: http://gerrit.cloudera.org:8080/3854
Reviewed-by: Adar Dembo <adar@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/acf093c1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/acf093c1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/acf093c1

Branch: refs/heads/master
Commit: acf093c18860c6812f6759f0e9680db25e37ef99
Parents: cee7bdd
Author: Dan Burkert <dan@cloudera.com>
Authored: Fri Aug 5 10:58:08 2016 -0700
Committer: Dan Burkert <dan@cloudera.com>
Committed: Wed Aug 10 02:52:55 2016 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/AlterTableOptions.java   |  89 +++++-
 .../apache/kudu/client/AlterTableRequest.java   |  18 +-
 .../org/apache/kudu/client/AsyncKuduClient.java | 291 ++++++++-----------
 .../kudu/client/GetTableLocationsRequest.java   |   1 +
 .../kudu/client/NonCoveredRangeCache.java       | 104 -------
 .../java/org/apache/kudu/client/Operation.java  |   7 +
 .../apache/kudu/client/TableLocationsCache.java | 279 ++++++++++++++++++
 .../org/apache/kudu/client/BaseKuduTest.java    |  23 +-
 .../org/apache/kudu/client/TestAlterTable.java  | 288 ++++++++++++++++++
 .../apache/kudu/client/TestAsyncKuduClient.java |   2 +-
 .../kudu/client/TestAsyncKuduSession.java       |   3 +-
 .../org/apache/kudu/client/TestKuduTable.java   |  50 +++-
 src/kudu/master/catalog_manager.cc              |  20 +-
 13 files changed, 880 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
index 1d97ca3..110443d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.apache.kudu.client;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 import org.apache.kudu.annotations.InterfaceAudience;
@@ -30,7 +32,7 @@ import static org.apache.kudu.master.Master.AlterTableRequestPB;
 @InterfaceStability.Unstable
 public class AlterTableOptions {
 
-  AlterTableRequestPB.Builder pb = AlterTableRequestPB.newBuilder();
+  private final AlterTableRequestPB.Builder pb = AlterTableRequestPB.newBuilder();
 
   /**
    * Change a table's name.
@@ -104,4 +106,89 @@ public class AlterTableOptions {
         .setNewName(newName));
     return this;
   }
+
+  /**
+   * Add a range partition to the table with an inclusive lower bound and an exclusive upper bound.
+   *
+   * If either row is empty, then that end of the range will be unbounded. If a range column is
+   * missing a value, the logical minimum value for that column type will be used as the default.
+   *
+   * Multiple range partitions may be added as part of a single alter table transaction by calling
+   * this method multiple times. Added range partitions must not overlap with each
+   * other or any existing range partitions (unless the existing range partitions are dropped as
+   * part of the alter transaction first). The lower bound must be less than the upper bound.
+   *
+   * This client will immediately be able to write and scan the new tablets when the alter table
+   * operation returns success, however other existing clients may have to wait for a timeout period
+   * to elapse before the tablets become visible. This period is configured by the master's
+   * 'table_locations_ttl_ms' flag, and defaults to one hour.
+   *
+   * @param lowerBound inclusive lower bound, may be empty but not null
+   * @param upperBound exclusive upper bound, may be empty but not null
+   * @return this instance
+   */
+  public AlterTableOptions addRangePartition(PartialRow lowerBound, PartialRow upperBound) {
+    Preconditions.checkNotNull(lowerBound);
+    Preconditions.checkNotNull(upperBound);
+    Preconditions.checkArgument(lowerBound.getSchema().equals(upperBound.getSchema()));
+
+    AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+    step.setType(AlterTableRequestPB.StepType.ADD_RANGE_PARTITION);
+    AlterTableRequestPB.AddRangePartition.Builder builder =
+        AlterTableRequestPB.AddRangePartition.newBuilder();
+    builder.setRangeBounds(
+        new Operation.OperationsEncoder().encodeLowerAndUpperBounds(lowerBound, upperBound));
+    step.setAddRangePartition(builder);
+    if (!pb.hasSchema()) {
+      pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema()));
+    }
+    return this;
+  }
+
+  /**
+   * Drop the range partition from the table with the specified inclusive lower bound and exclusive
+   * upper bound. The bounds must match exactly, and may not span multiple range partitions.
+   *
+   * If either row is empty, then that end of the range will be unbounded. If a range column is
+   * missing a value, the logical minimum value for that column type will be used as the default.
+   *
+   * Multiple range partitions may be dropped as part of a single alter table transaction by calling
+   * this method multiple times.
+   *
+   * @param lowerBound inclusive lower bound, can be empty but not null
+   * @param upperBound exclusive upper bound, can be empty but not null
+   * @return this instance
+   */
+  public AlterTableOptions dropRangePartition(PartialRow lowerBound, PartialRow upperBound) {
+    Preconditions.checkNotNull(lowerBound);
+    Preconditions.checkNotNull(upperBound);
+    Preconditions.checkArgument(lowerBound.getSchema().equals(upperBound.getSchema()));
+
+    AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder();
+    step.setType(AlterTableRequestPB.StepType.DROP_RANGE_PARTITION);
+    AlterTableRequestPB.DropRangePartition.Builder builder =
+        AlterTableRequestPB.DropRangePartition.newBuilder();
+    builder.setRangeBounds(
+        new Operation.OperationsEncoder().encodeLowerAndUpperBounds(lowerBound, upperBound));
+    step.setDropRangePartition(builder);
+    if (!pb.hasSchema()) {
+      pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema()));
+    }
+    return this;
+  }
+
+  /**
+   * @return {@code true} if the alter table operation includes an add or drop partition operation
+   */
+  @InterfaceAudience.Private
+  boolean hasAddDropRangePartitions() {
+    return pb.hasSchema();
+  }
+
+  /**
+   * @return the AlterTableRequest protobuf message.
+   */
+  AlterTableRequestPB.Builder getProtobuf() {
+    return pb;
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
index bf3dce2..391e549 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.kudu.client;
 
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Message;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.util.Pair;
@@ -23,6 +24,9 @@ import org.jboss.netty.buffer.ChannelBuffer;
 
 import static org.apache.kudu.master.Master.*;
 
+import java.util.Collection;
+import java.util.List;
+
 /**
  * RPC used to alter a table. When it returns it doesn't mean that the table is altered,
  * a success just means that the master accepted it.
@@ -33,18 +37,21 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
   static final String ALTER_TABLE = "AlterTable";
   private final String name;
   private final AlterTableRequestPB.Builder builder;
+  private final List<Integer> requiredFeatures;
 
   AlterTableRequest(KuduTable masterTable, String name, AlterTableOptions ato) {
     super(masterTable);
     this.name = name;
-    this.builder = ato.pb;
+    this.builder = ato.getProtobuf();
+    this.requiredFeatures = ato.hasAddDropRangePartitions() ?
+        ImmutableList.of(MasterFeatures.RANGE_PARTITION_BOUNDS_VALUE) :
+        ImmutableList.<Integer>of();
   }
 
   @Override
   ChannelBuffer serialize(Message header) {
     assert header.isInitialized();
-    TableIdentifierPB tableID =
-        TableIdentifierPB.newBuilder().setTableName(name).build();
+    TableIdentifierPB tableID = TableIdentifierPB.newBuilder().setTableName(name).build();
     this.builder.setTable(tableID);
     return toChannelBuffer(header, this.builder.build());
   }
@@ -67,4 +74,9 @@ class AlterTableRequest extends KuduRpc<AlterTableResponse> {
     return new Pair<AlterTableResponse, Object>(
         response, respBuilder.hasError() ? respBuilder.getError() : null);
   }
+
+  @Override
+  Collection<Integer> getRequiredFeatures() {
+    return requiredFeatures;
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index fc29b71..2b0cb75 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -83,8 +83,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -139,24 +137,26 @@ public class AsyncKuduClient implements AutoCloseable {
   public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
   public static final long DEFAULT_SOCKET_READ_TIMEOUT_MS = 10000;
   private static final long MAX_RPC_ATTEMPTS = 100;
+  static final int MAX_RETURNED_TABLE_LOCATIONS = 10;
 
   private final ClientSocketChannelFactory channelFactory;
 
   /**
-   * This map and the next 2 maps contain the same data, but indexed
-   * differently. There is no consistency guarantee across the maps.
-   * They are not updated all at the same time atomically.  This map
-   * is always the first to be updated, because that's the map from
-   * which all the lookups are done in the fast-path of the requests
-   * that need to locate a tablet. The second map to be updated is
-   * tablet2client, because it comes second in the fast-path
-   * of every requests that need to locate a tablet. The third map
-   * is only used to handle TabletServer disconnections gracefully.
+   * This map and the next 2 maps contain data cached from calls to the master's
+   * GetTableLocations RPC. There is no consistency guarantee across the maps.
+   * They are not updated all at the same time atomically.
+   *
+   * {@code tableLocations} is always the first to be updated because it's the
+   * map from which all the lookups are done in the fast-path of the requests
+   * that need to locate a tablet. {@code tablet2client} is updated second,
+   * because it comes second in the fast-path of every requests that need to
+   * locate a tablet. {@code client2tablets} is only used to handle TabletServer
+   * disconnections gracefully.
    *
    * This map is keyed by table ID.
    */
-  private final ConcurrentHashMap<String, ConcurrentSkipListMap<byte[],
-      RemoteTablet>> tabletsCache = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
+      new ConcurrentHashMap<>();
 
   /**
    * Maps a tablet ID to the RemoteTablet that knows where all the replicas are served.
@@ -171,16 +171,6 @@ public class AsyncKuduClient implements AutoCloseable {
       new ConcurrentHashMap<>();
 
   /**
-   * Map of table ID to non-covered range cache.
-   *
-   * TODO: Currently once a non-covered range is added to the cache, it is never
-   * removed. Once adding range partitions becomes possible entries will need to
-   * be expired.
-   */
-  private final ConcurrentMap<String, NonCoveredRangeCache> nonCoveredRangeCaches =
-      new ConcurrentHashMap<>();
-
-  /**
    * Cache that maps a TabletServer address ("ip:port") to the clients
    * connected to it.
    * <p>
@@ -360,7 +350,24 @@ public class AsyncKuduClient implements AutoCloseable {
     checkIsClosed();
     AlterTableRequest alter = new AlterTableRequest(this.masterTable, name, ato);
     alter.setTimeoutMillis(defaultAdminOperationTimeoutMs);
-    return sendRpcToTablet(alter);
+    Deferred<AlterTableResponse> response = sendRpcToTablet(alter);
+
+    if (ato.hasAddDropRangePartitions()) {
+      // Clear the table locations cache so the new partition is immediately visible.
+      Callback clearCacheCB = new Callback() {
+        @Override
+        public Object call(Object resp) throws Exception {
+          tableLocations.clear();
+          return resp;
+        }
+        @Override
+        public String toString() {
+          return "ClearCacheCB";
+        }
+      };
+      return response.addCallback(clearCacheCB).addErrback(clearCacheCB);
+    }
+    return response;
   }
 
   /**
@@ -681,16 +688,15 @@ public class AsyncKuduClient implements AutoCloseable {
     request.attempt++;
     final String tableId = request.getTable().getTableId();
     byte[] partitionKey = request.partitionKey();
-    RemoteTablet tablet = getTablet(tableId, partitionKey);
-
-    if (tablet == null && partitionKey != null) {
-      // Check if the RPC is in a non-covered range.
-      Map.Entry<byte[], byte[]> nonCoveredRange = getNonCoveredRange(tableId, partitionKey);
-      if (nonCoveredRange != null) {
-        return Deferred.fromError(new NonCoveredRangeException(nonCoveredRange.getKey(),
-                                                               nonCoveredRange.getValue()));
-      }
-      // Otherwise fall through to below where a GetTableLocations lookup will occur.
+    TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
+
+    if (entry != null && entry.isNonCoveredRange()) {
+      Exception e = new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
+                                                 entry.getUpperBoundPartitionKey());
+      // Sending both as an errback and returning fromError because sendRpcToTablet might be
+      // called via a callback that won't care about the returned Deferred.
+      request.errback(e);
+      return Deferred.fromError(e);
     }
 
     // Set the propagated timestamp so that the next time we send a message to
@@ -705,7 +711,8 @@ public class AsyncKuduClient implements AutoCloseable {
     // disconnected, say because we didn't query that tablet for some seconds, then we'll try to
     // reconnect based on the old information. If that fails, we'll instead continue with the next
     // block that queries the master.
-    if (tablet != null) {
+    if (entry != null) {
+      RemoteTablet tablet = entry.getTablet();
       TabletClient tabletClient = clientFor(tablet);
       if (tabletClient != null) {
         final Deferred<R> d = request.getDeferred();
@@ -971,13 +978,17 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * This method first clears tabletsCache and then tablet2client without any regards for
-   * calls to {@link #discoverTablets}. Call only when AsyncKuduClient is in a steady state.
-   * @param tableId table for which we remove all the RemoteTablet entries
+   * Clears {@link #tableLocations} and {@link #tablet2client} of the table's
+   * entries.
+   *
+   * This method makes the maps momentarily inconsistent, and should only be
+   * used when the {@code AsyncKuduClient} is in a steady state.
+   * @param tableId table for which we remove all cached tablet location and
+   *                tablet client entries
    */
   @VisibleForTesting
   void emptyTabletsCacheForTable(String tableId) {
-    tabletsCache.remove(tableId);
+    tableLocations.remove(tableId);
     Set<Map.Entry<Slice, RemoteTablet>> tablets = tablet2client.entrySet();
     for (Map.Entry<Slice, RemoteTablet> entry : tablets) {
       if (entry.getValue().getTableId().equals(tableId)) {
@@ -1054,8 +1065,8 @@ public class AsyncKuduClient implements AutoCloseable {
       // If we failed to acquire a permit, it's worth checking if someone
       // looked up the tablet we're interested in.  Every once in a while
       // this will save us a Master lookup.
-      RemoteTablet tablet = getTablet(tableId, partitionKey);
-      if (tablet != null && clientFor(tablet) != null) {
+      TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
+      if (entry != null && !entry.isNonCoveredRange() && clientFor(entry.getTablet()) != null) {
         return Deferred.fromResult(null);  // Looks like no lookup needed.
       }
     }
@@ -1152,16 +1163,13 @@ public class AsyncKuduClient implements AutoCloseable {
            (partitionKey.length > 0 &&
             (endPartitionKey == null || Bytes.memcmp(partitionKey, endPartitionKey) < 0))) {
       byte[] key = partitionKey == null ? EMPTY_ARRAY : partitionKey;
-      RemoteTablet tablet = getTablet(tableId, key);
-      if (tablet != null) {
-        ret.add(new LocatedTablet(tablet));
-        partitionKey = tablet.getPartition().getPartitionKeyEnd();
-        continue;
-      }
+      TableLocationsCache.Entry entry = getTableLocationEntry(tableId, key);
 
-      Map.Entry<byte[], byte[]> nonCoveredRange = getNonCoveredRange(tableId, key);
-      if (nonCoveredRange != null) {
-        partitionKey = nonCoveredRange.getValue();
+      if (entry != null) {
+        if (!entry.isNonCoveredRange()) {
+          ret.add(new LocatedTablet(entry.getTablet()));
+        }
+        partitionKey = entry.getUpperBoundPartitionKey();
         continue;
       }
 
@@ -1288,14 +1296,13 @@ public class AsyncKuduClient implements AutoCloseable {
         }
       } else {
         try {
-          discoverTablets(table, response.getTabletLocationsList());
+          discoverTablets(table,
+                          partitionKey,
+                          response.getTabletLocationsList(),
+                          response.getTtlMillis());
         } catch (NonRecoverableException e) {
           return e;
         }
-        if (partitionKey != null) {
-          discoverNonCoveredRangePartitions(table.getTableId(), partitionKey,
-                                            response.getTabletLocationsList());
-        }
       }
       return null;
     }
@@ -1323,33 +1330,47 @@ public class AsyncKuduClient implements AutoCloseable {
     masterLookups.release();
   }
 
+  /**
+   * Makes discovered tablet locations visible in the clients caches.
+   * @param table the table which the locations belong to
+   * @param requestPartitionKey the partition key of the table locations request
+   * @param locations the discovered locations
+   * @param ttl the ttl of the locations
+   */
   @VisibleForTesting
-  void discoverTablets(KuduTable table, List<Master.TabletLocationsPB> locations)
-      throws NonRecoverableException {
+  void discoverTablets(KuduTable table,
+                       byte[] requestPartitionKey,
+                       List<Master.TabletLocationsPB> locations,
+                       long ttl) throws NonRecoverableException {
     String tableId = table.getTableId();
     String tableName = table.getName();
 
-    // Doing a get first instead of putIfAbsent to avoid creating unnecessary CSLMs because in
-    // the most common case the table should already be present
-    ConcurrentSkipListMap<byte[], RemoteTablet> tablets = tabletsCache.get(tableId);
-    if (tablets == null) {
-      tablets = new ConcurrentSkipListMap<>(Bytes.MEMCMP);
-      ConcurrentSkipListMap<byte[], RemoteTablet> oldTablets =
-          tabletsCache.putIfAbsent(tableId, tablets);
-      if (oldTablets != null) {
-        tablets = oldTablets;
+    // Doing a get first instead of putIfAbsent to avoid creating unnecessary
+    // table locations caches because in the most common case the table should
+    // already be present.
+    TableLocationsCache locationsCache = tableLocations.get(tableId);
+    if (locationsCache == null) {
+      locationsCache = new TableLocationsCache();
+      TableLocationsCache existingLocationsCache =
+          tableLocations.putIfAbsent(tableId, locationsCache);
+      if (existingLocationsCache != null) {
+        locationsCache = existingLocationsCache;
       }
     }
 
+    // Build the list of discovered remote tablet instances. If we have
+    // already discovered the tablet, its locations are refreshed.
+    List<RemoteTablet> tablets = new ArrayList<>(locations.size());
     for (Master.TabletLocationsPB tabletPb : locations) {
-      // Early creating the tablet so that it parses out the pb
+      // Early creating the tablet so that it parses out the pb.
       RemoteTablet rt = createTabletFromPb(tableId, tabletPb);
       Slice tabletId = rt.tabletId;
 
-      // If we already know about this one, just refresh the locations
+      // If we already know about this tablet, refresh the locations.
       RemoteTablet currentTablet = tablet2client.get(tabletId);
       if (currentTablet != null) {
         currentTablet.refreshTabletClients(tabletPb);
+        tablets.add(currentTablet);
         continue;
       }
 
@@ -1363,58 +1384,12 @@ public class AsyncKuduClient implements AutoCloseable {
       LOG.info("Discovered tablet {} for table '{}' with partition {}",
                tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
       rt.refreshTabletClients(tabletPb);
-      // This is making this tablet available
-      // Even if two clients were racing in this method they are putting the same RemoteTablet
-      // with the same start key in the CSLM in the end
-      tablets.put(rt.getPartition().getPartitionKeyStart(), rt);
-    }
-  }
-
-  private void discoverNonCoveredRangePartitions(String tableId,
-                                                 byte[] partitionKey,
-                                                 List<Master.TabletLocationsPB> locations) {
-    NonCoveredRangeCache nonCoveredRanges = nonCoveredRangeCaches.get(tableId);
-    if (nonCoveredRanges == null) {
-      nonCoveredRanges = new NonCoveredRangeCache();
-      NonCoveredRangeCache oldCache = nonCoveredRangeCaches.putIfAbsent(tableId, nonCoveredRanges);
-      if (oldCache != null) {
-        nonCoveredRanges = oldCache;
-      }
-    }
-
-    // If there are no locations, then the table has no tablets. This is
-    // guaranteed because we never set an upper bound on the GetTableLocations
-    // request, and the master will always return the tablet *before* the start
-    // of the request, if the start key falls in a non-covered range (see the
-    // comment on GetTableLocationsResponsePB in master.proto).
-    if (locations.isEmpty()) {
-      nonCoveredRanges.addNonCoveredRange(EMPTY_ARRAY, EMPTY_ARRAY);
-      return;
-    }
-
-    // If the first tablet occurs after the requested partition key,
-    // then there is an initial non-covered range.
-    byte[] firstStartKey = locations.get(0).getPartition().getPartitionKeyStart().toByteArray();
-    if (Bytes.memcmp(partitionKey, firstStartKey) < 0) {
-      nonCoveredRanges.addNonCoveredRange(EMPTY_ARRAY, firstStartKey);
+      tablets.add(rt);
     }
 
-    byte[] previousEndKey = null;
-    for (Master.TabletLocationsPB location : locations) {
-      byte[] startKey = location.getPartition().getPartitionKeyStart().toByteArray();
-
-      // Check if there is a non-covered range between this tablet and the previous.
-      if (previousEndKey != null && Bytes.memcmp(previousEndKey, startKey) < 0) {
-        nonCoveredRanges.addNonCoveredRange(previousEndKey, startKey);
-      }
-      previousEndKey = location.getPartition().getPartitionKeyEnd().toByteArray();
-    }
-
-    if (previousEndKey.length > 0 && Bytes.memcmp(previousEndKey, partitionKey) <= 0) {
-      // This happens if the partition key falls in a non-covered range that
-      // is unbounded (to the right).
-      nonCoveredRanges.addNonCoveredRange(previousEndKey, EMPTY_ARRAY);
-    }
+    // Give the locations to the tablet location cache for the table, so that it
+    // can cache them and discover non-covered ranges.
+    locationsCache.cacheTabletLocations(tablets, requestPartitionKey, ttl);
   }
 
   RemoteTablet createTabletFromPb(String tableId, Master.TabletLocationsPB tabletPb) {
@@ -1424,42 +1399,17 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Gives the tablet's ID for the table ID and partition key.
-   * In the future there will be multiple tablets and this method will find the right one.
-   * @param tableId table to find the tablet for
-   * @return a tablet ID as a slice or null if not found
+   * Gets the tablet location cache entry for the tablet in the table covering a partition key.
+   * @param tableId the table
+   * @param partitionKey the partition key of the tablet to find
+   * @return a tablet location cache entry, or null if the partition key has not been discovered
    */
-  RemoteTablet getTablet(String tableId, byte[] partitionKey) {
-    ConcurrentSkipListMap<byte[], RemoteTablet> tablets = tabletsCache.get(tableId);
-
-    if (tablets == null) {
+  TableLocationsCache.Entry getTableLocationEntry(String tableId, byte[] partitionKey) {
+    TableLocationsCache cache = tableLocations.get(tableId);
+    if (cache == null) {
       return null;
     }
-
-    // We currently only have one master tablet.
-    if (isMasterTable(tableId)) {
-      if (tablets.firstEntry() == null) {
-        return null;
-      }
-      return tablets.firstEntry().getValue();
-    }
-
-    Map.Entry<byte[], RemoteTablet> tabletPair = tablets.floorEntry(partitionKey);
-
-    if (tabletPair == null) {
-      return null;
-    }
-
-    Partition partition = tabletPair.getValue().getPartition();
-
-    // If the partition is not the end partition, but it doesn't include the key
-    // we are looking for, then we have not yet found the correct tablet.
-    if (!partition.isEndPartition()
-        && Bytes.memcmp(partitionKey, partition.getPartitionKeyEnd()) >= 0) {
-      return null;
-    }
-
-    return tabletPair.getValue();
+    return cache.get(partitionKey);
   }
 
   /**
@@ -1491,10 +1441,24 @@ public class AsyncKuduClient implements AutoCloseable {
             Preconditions.checkArgument(tablets.size() <= 1,
                                         "found more than one tablet for a single partition key");
             if (tablets.size() == 0) {
-              Map.Entry<byte[], byte[]> nonCoveredRange =
-                  nonCoveredRangeCaches.get(table.getTableId()).getNonCoveredRange(partitionKey);
-              return Deferred.fromError(new NonCoveredRangeException(nonCoveredRange.getKey(),
-                                                                     nonCoveredRange.getValue()));
+              // Most likely this indicates a non-covered range, but since this
+              // could race with an alter table partitioning operation (which
+              // clears the local table locations cache), we check again.
+              TableLocationsCache.Entry entry = getTableLocationEntry(table.getTableId(),
+                                                                      partitionKey);
+
+              if (entry == null) {
+                // This should be extremely rare, but a potential source of tight loops.
+                LOG.debug("Table location expired before it could be processed; retrying.");
+                return Deferred.fromError(new RecoverableException(Status.NotFound(
+                    "Table location expired before it could be processed")));
+              }
+              if (entry.isNonCoveredRange()) {
+                return Deferred.fromError(
+                    new NonCoveredRangeException(entry.getLowerBoundPartitionKey(),
+                                                 entry.getUpperBoundPartitionKey()));
+              }
+              return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
             }
             return Deferred.fromResult(tablets.get(0));
           }
@@ -1502,23 +1466,6 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Returns the non-covered range partition containing the {@code partitionKey} in
-   * the table, or null if there is no known non-covering range for the partition key.
-   * @param tableId of the table
-   * @param partitionKey to lookup
-   * @return the non-covering partition range, or {@code null}
-   */
-   Map.Entry<byte[], byte[]> getNonCoveredRange(String tableId, byte[] partitionKey) {
-     if (isMasterTable(tableId)) {
-       throw new IllegalArgumentException("No non-covering range partitions for the master");
-     }
-     NonCoveredRangeCache nonCoveredRangeCache = nonCoveredRangeCaches.get(tableId);
-     if (nonCoveredRangeCache == null) return null;
-
-     return nonCoveredRangeCache.getNonCoveredRange(partitionKey);
-   }
-
-  /**
    * Retrieve the master registration (see {@link GetMasterRegistrationResponse}
    * for a replica.
    * @param masterClient An initialized client for the master replica.

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index 953525a..1bc2a14 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -79,6 +79,7 @@ class GetTableLocationsRequest extends KuduRpc<Master.GetTableLocationsResponseP
     if (endKey != null) {
       builder.setPartitionKeyEnd(ZeroCopyLiteralByteString.wrap(endKey));
     }
+    builder.setMaxReturnedLocations(AsyncKuduClient.MAX_RETURNED_TABLE_LOCATIONS);
     return toChannelBuffer(header, builder.build());
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeCache.java
deleted file mode 100644
index ac72057..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/NonCoveredRangeCache.java
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.kudu.client;
-
-import com.google.common.base.Joiner;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.util.Comparator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import javax.annotation.concurrent.ThreadSafe;
-
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A cache of the non-covered range partitions in a Kudu table.
- *
- * Currently entries are never invalidated from the cache.
- */
-@ThreadSafe
-@InterfaceAudience.Private
-class NonCoveredRangeCache {
-  private static final Logger LOG = LoggerFactory.getLogger(NonCoveredRangeCache.class);
-  private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
-
-  private final ConcurrentNavigableMap<byte[], byte[]> nonCoveredRanges =
-      new ConcurrentSkipListMap<>(COMPARATOR);
-
-  /**
-   * Retrieves a non-covered range from the cache.
-   *
-   * The pair contains the inclusive start partition key and the exclusive end
-   * partition key containing the provided partition key. If there is no such
-   * cached range, null is returned.
-   *
-   * @param partitionKey the partition key to lookup in the cache
-   * @return the non covered range, or null
-   */
-  public Map.Entry<byte[], byte[]> getNonCoveredRange(byte[] partitionKey) {
-    Map.Entry<byte[], byte[]> range = nonCoveredRanges.floorEntry(partitionKey);
-    if (range == null ||
-        (range.getValue().length != 0 && COMPARATOR.compare(partitionKey, range.getValue()) >= 0)) {
-      return null;
-    } else {
-      return range;
-    }
-  }
-
-  /**
-   * Adds a non-covered range to the cache.
-   *
-   * @param startPartitionKey the inclusive start partition key of the non-covered range
-   * @param endPartitionKey the exclusive end partition key of the non-covered range
-   */
-  public void addNonCoveredRange(byte[] startPartitionKey, byte[] endPartitionKey) {
-    if (startPartitionKey == null || endPartitionKey == null) {
-      throw new IllegalArgumentException("Non-covered partition range keys may not be null");
-    }
-    // Concurrent additions of the same non-covered range key are handled by
-    // serializing puts through the concurrent map.
-    if (nonCoveredRanges.put(startPartitionKey, endPartitionKey) == null) {
-      LOG.info("Discovered non-covered partition range [{}, {})",
-               Bytes.hex(startPartitionKey), Bytes.hex(endPartitionKey));
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append('[');
-    boolean isFirst = true;
-    for (Map.Entry<byte[], byte[]> range : nonCoveredRanges.entrySet()) {
-      if (isFirst) {
-        isFirst = false;
-      } else {
-        sb.append(", ");
-      }
-      sb.append('[');
-      sb.append(range.getKey().length == 0 ? "<start>" : Bytes.hex(range.getKey()));
-      sb.append(", ");
-      sb.append(range.getValue().length == 0 ? "<end>" : Bytes.hex(range.getValue()));
-      sb.append(')');
-    }
-    sb.append(']');
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 0af0063..e9a718e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -341,5 +341,12 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
 
       return toPB();
     }
+
+    public RowOperationsPB encodeLowerAndUpperBounds(PartialRow lowerBound, PartialRow upperBound) {
+      init(lowerBound.getSchema(), 2);
+      encodeRow(lowerBound, ChangeType.RANGE_LOWER_BOUND);
+      encodeRow(upperBound, ChangeType.RANGE_UPPER_BOUND);
+      return toPB();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
new file mode 100644
index 0000000..0c27bfa
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TableLocationsCache.java
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.client.AsyncKuduClient.RemoteTablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A cache of the tablet locations in a table, keyed by partition key. Entries
+ * in the cache are either tablets or non-covered ranges.
+ */
+@ThreadSafe
+@InterfaceAudience.Private
+class TableLocationsCache {
+  private static final Logger LOG = LoggerFactory.getLogger(TableLocationsCache.class);
+  private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
+
+  private final Object monitor = new Object();
+
+  @GuardedBy("monitor")
+  private final NavigableMap<byte[], Entry> entries = new TreeMap<>(COMPARATOR);
+
+  public Entry get(byte[] partitionKey) {
+    if (partitionKey == null) {
+      // Master lookup.
+      synchronized (monitor) {
+        Preconditions.checkState(entries.size() <= 1);
+        return entries.get(AsyncKuduClient.EMPTY_ARRAY);
+      }
+    }
+    Map.Entry<byte[], Entry> entry;
+    synchronized (monitor) {
+      entry = entries.floorEntry(partitionKey);
+    }
+
+    if (entry == null ||
+        (entry.getValue().getUpperBoundPartitionKey().length > 0 &&
+            Bytes.memcmp(partitionKey, entry.getValue().getUpperBoundPartitionKey()) >= 0) ||
+        entry.getValue().isStale()) {
+      return null;
+    }
+    return entry.getValue();
+  }
+
+  /**
+   * Add tablet locations to the cache.
+   *
+   * Already known tablet locations will have their entry updated and deadline extended.
+   *
+   * @param tablets the discovered tablets to cache
+   * @param requestPartitionKey the lookup partition key
+   * @param ttl the time in milliseconds that the tablets may be cached for
+   */
+  public void cacheTabletLocations(List<RemoteTablet> tablets,
+                                   byte[] requestPartitionKey,
+                                   long ttl) {
+    long deadline = System.nanoTime() + ttl * TimeUnit.MILLISECONDS.toNanos(1);
+    if (requestPartitionKey == null) {
+      // Master lookup.
+      Preconditions.checkArgument(tablets.size() == 1);
+      Entry entry = Entry.tablet(tablets.get(0), TimeUnit.DAYS.toMillis(1));
+      synchronized (monitor) {
+        entries.clear();
+        entries.put(AsyncKuduClient.EMPTY_ARRAY, entry);
+      }
+      return;
+    }
+
+    List<Entry> newEntries = new ArrayList<>();
+
+    if (tablets.isEmpty()) {
+      // If there are no tablets in the response, then the table is empty. If
+      // there were any tablets in the table they would have been returned, since
+      // the master guarantees that if the partition key falls in a non-covered
+      // range, the previous tablet will be returned, and we did not set an upper
+      // bound partition key on the request.
+      newEntries.add(Entry.nonCoveredRange(AsyncKuduClient.EMPTY_ARRAY,
+                                           AsyncKuduClient.EMPTY_ARRAY,
+                                           deadline));
+    } else {
+      // The comments below will reference the following diagram:
+      //
+      //   +---+   +---+---+
+      //   |   |   |   |   |
+      // A | B | C | D | E | F
+      //   |   |   |   |   |
+      //   +---+   +---+---+
+      //
+      // It depicts a tablet locations response from the master containing three
+      // tablets: B, D and E. Three non-covered ranges are present: A, C, and F.
+      // An RPC response containing B, D and E could occur if the lookup partition
+      // key falls in A, B, or C, although the existence of A as an initial
+      // non-covered range can only be inferred if the lookup partition key falls
+      // in A.
+
+      final byte[] firstLowerBound = tablets.get(0).getPartition().getPartitionKeyStart();
+
+      if (Bytes.memcmp(requestPartitionKey, firstLowerBound) < 0) {
+        // If the first tablet is past the requested partition key, then the
+        // partition key falls in an initial non-covered range, such as A.
+        newEntries.add(Entry.nonCoveredRange(AsyncKuduClient.EMPTY_ARRAY, firstLowerBound, deadline));
+      }
+
+      // lastUpperBound tracks the upper bound of the previously processed
+      // entry, so that we can determine when we have found a non-covered range.
+      byte[] lastUpperBound = firstLowerBound;
+
+      for (RemoteTablet tablet : tablets) {
+        final byte[] tabletLowerBound = tablet.getPartition().getPartitionKeyStart();
+        final byte[] tabletUpperBound = tablet.getPartition().getPartitionKeyEnd();
+
+        if (Bytes.memcmp(lastUpperBound, tabletLowerBound) < 0) {
+          // There is a non-covered range between the previous tablet and this tablet.
+          // This will discover C while processing the tablet location for D.
+          newEntries.add(Entry.nonCoveredRange(lastUpperBound, tabletLowerBound, deadline));
+        }
+        lastUpperBound = tabletUpperBound;
+
+        // Now add the tablet itself (such as B, D, or E).
+        newEntries.add(Entry.tablet(tablet, deadline));
+      }
+
+      if (lastUpperBound.length > 0 &&
+          tablets.size() < AsyncKuduClient.MAX_RETURNED_TABLE_LOCATIONS) {
+        // There is a non-covered range between the last tablet and the end of the
+        // partition key space, such as F.
+        newEntries.add(Entry.nonCoveredRange(lastUpperBound, AsyncKuduClient.EMPTY_ARRAY, deadline));
+      }
+    }
+
+    byte[] discoveredlowerBound = newEntries.get(0).getLowerBoundPartitionKey();
+    byte[] discoveredUpperBound = newEntries.get(newEntries.size() - 1)
+                                            .getUpperBoundPartitionKey();
+
+    LOG.debug("Discovered table locations:\t{}", newEntries);
+
+    synchronized (monitor) {
+      // Remove all existing overlapping entries, and add the new entries.
+      Map.Entry<byte[], Entry> floorEntry = entries.floorEntry(discoveredlowerBound);
+      if (floorEntry != null &&
+          Bytes.memcmp(requestPartitionKey,
+                       floorEntry.getValue().getUpperBoundPartitionKey()) < 0) {
+        discoveredlowerBound = floorEntry.getKey();
+      }
+
+      NavigableMap<byte[], Entry> overlappingEntries = entries.tailMap(discoveredlowerBound, true);
+      if (discoveredUpperBound.length > 0) {
+        overlappingEntries = overlappingEntries.headMap(discoveredUpperBound, false);
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Existing table locations:\t\t{}", entries.values());
+        LOG.trace("Removing table locations:\t\t{}", overlappingEntries.values());
+      }
+      overlappingEntries.clear();
+
+      for (Entry entry : newEntries) {
+        entries.put(entry.getLowerBoundPartitionKey(), entry);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return entries.values().toString();
+  }
+
+  /**
+   * An entry in the meta cache. Represents either a non-covered range, or a tablet.
+   */
+  public static class Entry {
+    /** The remote tablet, only set if this entry represents a tablet. */
+    private final RemoteTablet tablet;
+    /** The lower bound partition key, only set if this is a non-covered range. */
+    private final byte[] lowerBoundPartitionKey;
+    /** The upper bound partition key, only set if this is a non-covered range. */
+    private final byte[] upperBoundPartitionKey;
+    /** Deadline in ns relative the the System nanotime clock. */
+    private final long deadline;
+
+    private Entry(RemoteTablet tablet,
+                  byte[] lowerBoundPartitionKey,
+                  byte[] upperBoundPartitionKey,
+                  long deadline) {
+      this.tablet = tablet;
+      this.lowerBoundPartitionKey = lowerBoundPartitionKey;
+      this.upperBoundPartitionKey = upperBoundPartitionKey;
+      this.deadline = deadline;
+    }
+
+    public static Entry nonCoveredRange(byte[] lowerBoundPartitionKey,
+                                        byte[] upperBoundPartitionKey,
+                                        long deadline) {
+      return new Entry(null, lowerBoundPartitionKey, upperBoundPartitionKey, deadline);
+    }
+
+    public static Entry tablet(RemoteTablet tablet, long deadline) {
+      return new Entry(tablet, null, null, deadline);
+    }
+
+    /**
+     * @return {@code true} if this entry is a non-covered range.
+     */
+    public boolean isNonCoveredRange() {
+      return tablet == null;
+    }
+
+    /**
+     * @return the {@link RemoteTablet} for this tablet, or null
+     * if this is a non-covered range.
+     */
+    public RemoteTablet getTablet() {
+      return tablet;
+    }
+
+    public byte[] getLowerBoundPartitionKey() {
+      return tablet == null ? lowerBoundPartitionKey : tablet.getPartition().getPartitionKeyStart();
+    }
+
+    public byte[] getUpperBoundPartitionKey() {
+      return tablet == null ? upperBoundPartitionKey : tablet.getPartition().getPartitionKeyEnd();
+    }
+
+    private long ttl() {
+      return TimeUnit.NANOSECONDS.toMillis(deadline - System.nanoTime());
+    }
+
+    public boolean isStale() {
+      return ttl() <= 0;
+    }
+
+    @Override
+    public String toString() {
+      if (isNonCoveredRange()) {
+        return MoreObjects.toStringHelper("NonCoveredRange")
+                          .add("lowerBoundPartitionKey", Bytes.hex(lowerBoundPartitionKey))
+                          .add("upperBoundPartitionKey", Bytes.hex(upperBoundPartitionKey))
+                          .add("ttl", ttl())
+                          .toString();
+      } else {
+        return MoreObjects.toStringHelper("Tablet")
+                          .add("lowerBoundPartitionKey", Bytes.hex(getLowerBoundPartitionKey()))
+                          .add("upperBoundPartitionKey", Bytes.hex(getUpperBoundPartitionKey()))
+                          .add("tablet-id", tablet.getTabletIdAsString())
+                          .add("ttl", ttl())
+                          .toString();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 8f092a1..af9e124 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -113,7 +113,7 @@ public class BaseKuduTest {
   }
 
   protected static KuduTable createTable(String tableName, Schema schema,
-                                         CreateTableOptions builder) throws Exception {
+                                         CreateTableOptions builder) throws KuduException {
     LOG.info("Creating table: {}", tableName);
     return client.syncClient().createTable(tableName, schema, builder);
   }
@@ -147,6 +147,27 @@ public class BaseKuduTest {
     return counter.get();
   }
 
+  /**
+   * Scans the table and returns the number of rows.
+   * @param table the table
+   * @param predicates optional predicates to apply to the scan
+   * @return the number of rows in the table matching the predicates
+   */
+  protected long countRowsInTable(KuduTable table, KuduPredicate... predicates)
+      throws KuduException {
+    long count = 0;
+    KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table);
+    for (KuduPredicate predicate : predicates) {
+      scanBuilder.addPredicate(predicate);
+    }
+    scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    KuduScanner scanner = scanBuilder.build();
+    while (scanner.hasMoreRows()) {
+      count += scanner.nextRows().getNumRows();
+    }
+    return count;
+  }
+
   protected List<String> scanTableToStrings(KuduTable table,
                                             KuduPredicate... predicates) throws Exception {
     List<String> rowStrings = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
new file mode 100644
index 0000000..83b9767
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAlterTable.java
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.util.Pair;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class TestAlterTable extends BaseKuduTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
+  private String tableName;
+
+  @Before
+  public void setTableName() {
+    tableName = TestKuduClient.class.getName() + "-" + System.currentTimeMillis();
+  }
+
+  /**
+   * Creates a new table with two int columns, c0 and c1. c0 is the primary key.
+   * The table is hash partitioned on c0 into two buckets, and range partitioned
+   * with the provided bounds.
+   */
+  private KuduTable createTable(List<Pair<Integer, Integer>> bounds) throws KuduException {
+    // Create initial table with single range partition covering the entire key
+    // space, and two hash buckets.
+    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c0", Type.INT32)
+                                .nullable(false)
+                                .key(true)
+                                .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32)
+                                .nullable(false)
+                                .build());
+    Schema schema = new Schema(columns);
+
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("c0"))
+                                .setNumReplicas(1)
+                                .addHashPartitions(ImmutableList.of("c0"), 2);
+
+    for (Pair<Integer, Integer> bound : bounds) {
+      PartialRow lower = schema.newPartialRow();
+      PartialRow upper = schema.newPartialRow();
+      lower.addInt("c0", bound.getFirst());
+      upper.addInt("c0", bound.getSecond());
+      createOptions.addRangeBound(lower, upper);
+    }
+
+    return BaseKuduTest.createTable(tableName, schema, createOptions);
+  }
+
+  /**
+   * Insert rows into the provided table. The table's columns must be ints, and
+   * must have a primary key in the first column.
+   * @param table the table
+   * @param start the inclusive start key
+   * @param end the exclusive end key
+   */
+  private void insertRows(KuduTable table, int start, int end) throws KuduException {
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    for (int i = start; i < end; i++) {
+      Insert insert = table.newInsert();
+      for (int idx = 0; idx < table.getSchema().getColumnCount(); idx++) {
+        insert.getRow().addInt(idx, i);
+      }
+      session.apply(insert);
+    }
+    session.flush();
+    RowError[] rowErrors = session.getPendingErrors().getRowErrors();
+    assertEquals(String.format("row errors: %s", Arrays.toString(rowErrors)), 0, rowErrors.length);
+  }
+
+  @Test
+  public void testAlterRangePartitioning() throws Exception {
+    KuduTable table = createTable(ImmutableList.<Pair<Integer,Integer>>of());
+    Schema schema = table.getSchema();
+
+    // Insert some rows, and then drop the partition and ensure that the table is empty.
+    insertRows(table, 0, 100);
+    assertEquals(100, countRowsInTable(table));
+    PartialRow lower = schema.newPartialRow();
+    PartialRow upper = schema.newPartialRow();
+    syncClient.alterTable(tableName, new AlterTableOptions().dropRangePartition(lower, upper));
+    assertEquals(0, countRowsInTable(table));
+
+    // Add new range partition and insert rows.
+    lower.addInt("c0", 0);
+    upper.addInt("c0", 100);
+    syncClient.alterTable(tableName, new AlterTableOptions().addRangePartition(lower, upper));
+    insertRows(table, 0, 100);
+    assertEquals(100, countRowsInTable(table));
+
+    // Replace the range partition with a different one.
+    AlterTableOptions options = new AlterTableOptions();
+    options.dropRangePartition(lower, upper);
+    lower.addInt("c0", 50);
+    upper.addInt("c0", 150);
+    options.addRangePartition(lower, upper);
+    syncClient.alterTable(tableName, options);
+    assertEquals(0, countRowsInTable(table));
+    insertRows(table, 50, 125);
+    assertEquals(75, countRowsInTable(table));
+
+    // Replace the range partition with the same one.
+    syncClient.alterTable(tableName, new AlterTableOptions().dropRangePartition(lower, upper)
+                                                            .addRangePartition(lower, upper));
+    assertEquals(0, countRowsInTable(table));
+    insertRows(table, 50, 125);
+    assertEquals(75, countRowsInTable(table));
+
+    // Alter table partitioning + alter table schema
+    lower.addInt("c0", 200);
+    upper.addInt("c0", 300);
+    syncClient.alterTable(tableName, new AlterTableOptions().addRangePartition(lower, upper)
+                                                            .renameTable(tableName + "-renamed")
+                                                            .addNullableColumn("c2", Type.INT32));
+    tableName = tableName + "-renamed";
+    insertRows(table, 200, 300);
+    assertEquals(175, countRowsInTable(table));
+    assertEquals(3, openTable(tableName).getSchema().getColumnCount());
+
+    // Drop all range partitions + alter table schema. This also serves to test
+    // specifying range bounds with a subset schema (since a column was
+    // previously added).
+    options = new AlterTableOptions();
+    options.dropRangePartition(lower, upper);
+    lower.addInt("c0", 50);
+    upper.addInt("c0", 150);
+    options.dropRangePartition(lower, upper);
+    options.dropColumn("c2");
+    syncClient.alterTable(tableName, options);
+    assertEquals(0, countRowsInTable(table));
+    assertEquals(2, openTable(tableName).getSchema().getColumnCount());
+  }
+
+  @Test
+  public void testAlterRangeParitioningInvalid() throws KuduException {
+    // Create initial table with single range partition covering [0, 100).
+    KuduTable table = createTable(ImmutableList.of(new Pair<>(0, 100)));
+    Schema schema = table.getSchema();
+    insertRows(table, 0, 100);
+    assertEquals(100, countRowsInTable(table));
+
+    // ADD [0, 100) <- illegal (duplicate)
+    PartialRow lower = schema.newPartialRow();
+    PartialRow upper = schema.newPartialRow();
+    lower.addInt("c0", 0);
+    upper.addInt("c0", 100);
+    try {
+      syncClient.alterTable(tableName, new AlterTableOptions().addRangePartition(lower, upper));
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage().contains(
+          "New partition conflicts with existing partition"));
+    }
+    assertEquals(100, countRowsInTable(table));
+
+    // ADD [50, 150) <- illegal (overlap)
+    lower.addInt("c0", 50);
+    upper.addInt("c0", 150);
+    try {
+      syncClient.alterTable(tableName, new AlterTableOptions().addRangePartition(lower, upper));
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage().contains(
+          "New partition conflicts with existing partition"));
+    }
+    assertEquals(100, countRowsInTable(table));
+
+    // ADD [-50, 50) <- illegal (overlap)
+    lower.addInt("c0", -50);
+    upper.addInt("c0", 50);
+    try {
+      syncClient.alterTable(tableName, new AlterTableOptions().addRangePartition(lower, upper));
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage().contains(
+          "New partition conflicts with existing partition"));
+    }
+    assertEquals(100, countRowsInTable(table));
+
+    // ADD [200, 300)
+    // ADD [-50, 150) <- illegal (overlap)
+    lower.addInt("c0", 200);
+    upper.addInt("c0", 300);
+    AlterTableOptions options = new AlterTableOptions();
+    options.addRangePartition(lower, upper);
+    lower.addInt("c0", -50);
+    upper.addInt("c0", 150);
+    options.addRangePartition(lower, upper);
+    try {
+      syncClient.alterTable(tableName, options);
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage().contains(
+          "New partition conflicts with existing partition"));
+    }
+    assertEquals(100, countRowsInTable(table));
+
+    // DROP [<start>, <end>)
+    try {
+      syncClient.alterTable(tableName,
+                            new AlterTableOptions().dropRangePartition(schema.newPartialRow(),
+                                                                       schema.newPartialRow()));
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage(), e.getStatus().getMessage().contains(
+          "No tablet found for drop partition step"));
+    }
+    assertEquals(100, countRowsInTable(table));
+
+    // DROP [50, 150)
+    // RENAME foo
+    lower.addInt("c0", 50);
+    upper.addInt("c0", 150);
+    try {
+      syncClient.alterTable(tableName, new AlterTableOptions().dropRangePartition(lower, upper)
+                                                              .renameTable("foo"));
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage().contains(
+          "No tablet found for drop partition step"));
+    }
+    assertEquals(100, countRowsInTable(table));
+    assertFalse(syncClient.tableExists("foo"));
+
+    // DROP [0, 100)
+    // ADD  [100, 200)
+    // DROP [100, 200)
+    // ADD  [150, 250)
+    // DROP [0, 10)    <- illegal
+    options = new AlterTableOptions();
+
+    lower.addInt("c0", 0);
+    upper.addInt("c0", 100);
+    options.dropRangePartition(lower, upper);
+
+    lower.addInt("c0", 100);
+    upper.addInt("c0", 200);
+    options.addRangePartition(lower, upper);
+    options.dropRangePartition(lower, upper);
+
+    lower.addInt("c0", 150);
+    upper.addInt("c0", 250);
+    options.addRangePartition(lower, upper);
+
+    lower.addInt("c0", 0);
+    upper.addInt("c0", 10);
+    options.dropRangePartition(lower, upper);
+    try {
+      syncClient.alterTable(tableName, options);
+    } catch (KuduException e) {
+      assertTrue(e.getStatus().isInvalidArgument());
+      assertTrue(e.getStatus().getMessage().contains(
+          "No tablet found for drop partition step"));
+    }
+    assertEquals(100, countRowsInTable(table));
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 14cf5ec..ad6b5d8 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -147,7 +147,7 @@ public class TestAsyncKuduClient extends BaseKuduTest {
     try {
       KuduTable badTable = new KuduTable(client, "Invalid table name",
           "Invalid table ID", null, null);
-      client.discoverTablets(badTable, tabletLocations);
+      client.discoverTablets(badTable, null, tabletLocations, 1000);
       fail("This should have failed quickly");
     } catch (Exception ex) {
       assertTrue(ex instanceof NonRecoverableException);

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 62095da..cf14b7c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -120,7 +120,8 @@ public class TestAsyncKuduSession extends BaseKuduTest {
     // Make sure tablet locations is cached.
     Insert insert = createInsert(1);
     session.apply(insert).join(DEFAULT_SLEEP);
-    RemoteTablet rt = client.getTablet(table.getTableId(), insert.partitionKey());
+    RemoteTablet rt =
+        client.getTableLocationEntry(table.getTableId(), insert.partitionKey()).getTablet();
     String tabletId = rt.getTabletIdAsString();
     TabletClient tc = client.clientFor(rt);
     try {

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
index 97362a6..6affc33 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java
@@ -46,7 +46,7 @@ public class TestKuduTable extends BaseKuduTest {
 
   @Test(timeout = 100000)
   public void testAlterTable() throws Exception {
-    String tableName = name.getMethodName();
+    String tableName = name.getMethodName() + System.currentTimeMillis();
     createTable(tableName, basicSchema, getBasicCreateTableOptions());
     try {
 
@@ -232,7 +232,7 @@ public class TestKuduTable extends BaseKuduTest {
 
   @Test(timeout = 100000)
   public void testLocateTableNonCoveringRange() throws Exception {
-    String tableName = name.getMethodName();
+    String tableName = name.getMethodName() + System.currentTimeMillis();
     syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
     KuduTable table = syncClient.openTable(tableName);
 
@@ -277,6 +277,52 @@ public class TestKuduTable extends BaseKuduTest {
     return row.encodePrimaryKey();
   }
 
+  @Test(timeout = 100000)
+  public void testAlterTableNonCoveringRange() throws Exception {
+    String tableName = name.getMethodName() + System.currentTimeMillis();
+    syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
+    KuduTable table = syncClient.openTable(tableName);
+    KuduSession session = syncClient.newSession();
+
+    AlterTableOptions ato = new AlterTableOptions();
+    PartialRow bLowerBound = schema.newPartialRow();
+    bLowerBound.addInt("key", 300);
+    PartialRow bUpperBound = schema.newPartialRow();
+    bUpperBound.addInt("key", 400);
+    ato.addRangePartition(bLowerBound, bUpperBound);
+    syncClient.alterTable(tableName, ato);
+
+    Insert insert = createBasicSchemaInsert(table, 301);
+    session.apply(insert);
+
+    List<LocatedTablet> tablets;
+
+    // all tablets
+    tablets = table.getTabletsLocations(getKeyInBytes(300), null, 100000);
+    assertEquals(1, tablets.size());
+    assertArrayEquals(getKeyInBytes(300), tablets.get(0).getPartition().getPartitionKeyStart());
+    assertArrayEquals(getKeyInBytes(400), tablets.get(0).getPartition().getPartitionKeyEnd());
+
+    insert = createBasicSchemaInsert(table, 201);
+    session.apply(insert);
+
+    ato = new AlterTableOptions();
+    bLowerBound = schema.newPartialRow();
+    bLowerBound.addInt("key", 200);
+    bUpperBound = schema.newPartialRow();
+    bUpperBound.addInt("key", 300);
+    ato.dropRangePartition(bLowerBound, bUpperBound);
+    syncClient.alterTable(tableName, ato);
+
+    insert = createBasicSchemaInsert(table, 202);
+    try {
+      session.apply(insert);
+      fail("Should get a non-recoverable");
+    } catch (NonCoveredRangeException e) {
+      // Expected.
+    }
+  }
+
   public KuduTable createTableWithSplitsAndTest(int splitsCount) throws Exception {
     String tableName = name.getMethodName() + System.currentTimeMillis();
     CreateTableOptions builder = getBasicCreateTableOptions();

http://git-wip-us.apache.org/repos/asf/kudu/blob/acf093c1/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index e8f0da5..e22fb47 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1277,15 +1277,15 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
           if (existing_iter != existing_tablets.end()) {
             TabletMetadataLock metadata(existing_iter->second, TabletMetadataLock::READ);
             if (metadata.data().pb.partition().partition_key_start() < upper_bound) {
-              return Status::NotFound("New partition conflicts with existing partition",
-                                      step.ShortDebugString());
+              return Status::InvalidArgument("New partition conflicts with existing partition",
+                                             step.ShortDebugString());
             }
           }
           if (existing_iter != existing_tablets.begin()) {
             TabletMetadataLock metadata(std::prev(existing_iter)->second, TabletMetadataLock::READ);
             if (metadata.data().pb.partition().partition_key_end() > lower_bound) {
-              return Status::NotFound("New partition conflicts with existing partition",
-                                      step.ShortDebugString());
+              return Status::InvalidArgument("New partition conflicts with existing partition",
+                                             step.ShortDebugString());
             }
           }
 
@@ -1294,15 +1294,15 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
           if (new_iter != new_tablets.end()) {
             const auto& metadata = new_iter->second->mutable_metadata()->dirty();
             if (metadata.pb.partition().partition_key_start() < upper_bound) {
-              return Status::NotFound("New partition conflicts with another new partition",
-                                      step.ShortDebugString());
+              return Status::InvalidArgument("New partition conflicts with another new partition",
+                                             step.ShortDebugString());
             }
           }
           if (new_iter != new_tablets.begin()) {
             const auto& metadata = std::prev(new_iter)->second->mutable_metadata()->dirty();
             if (metadata.pb.partition().partition_key_end() > lower_bound) {
-              return Status::NotFound("New partition conflicts with another new partition",
-                                      step.ShortDebugString());
+              return Status::InvalidArgument("New partition conflicts with another new partition",
+                                             step.ShortDebugString());
             }
           }
 
@@ -1344,8 +1344,8 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
           } else if (found_new) {
             new_tablets.erase(new_iter);
           } else {
-            return Status::NotFound("No tablet found for drop partition step",
-                                    step.ShortDebugString());
+            return Status::InvalidArgument("No tablet found for drop partition step",
+                                           step.ShortDebugString());
           }
         }
         break;


Mime
View raw message