accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: ACCUMULO-4813 Add ability to provide a load plan for bulk import (#607)
Date Thu, 13 Sep 2018 22:02:55 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ef140e  ACCUMULO-4813 Add ability to provide a load plan for bulk import (#607)
7ef140e is described below

commit 7ef140ec40c3768859b848350db8c6d6d20f7a56
Author: Keith Turner <kturner@apache.org>
AuthorDate: Thu Sep 13 18:00:40 2018 -0400

    ACCUMULO-4813 Add ability to provide a load plan for bulk import (#607)
---
 .../core/client/admin/TableOperations.java         |  54 +++--
 .../accumulo/core/client/impl/BulkImport.java      | 245 +++++++++++++++++----
 .../apache/accumulo/core/conf/ClientProperty.java  |   3 +-
 .../org/apache/accumulo/core/data/LoadPlan.java    | 223 +++++++++++++++++++
 .../apache/accumulo/core/file/FileOperations.java  |  13 +-
 .../file/blockfile/impl/CachableBlockFile.java     |   9 +-
 .../apache/accumulo/core/data/LoadPlanTest.java    | 105 +++++++++
 .../org/apache/accumulo/proxy/ProxyServer.java     |   6 +-
 .../org/apache/accumulo/server/util/FileUtil.java  |   2 +-
 .../shell/commands/ImportDirectoryCommand.java     |   5 +-
 .../accumulo/test/functional/BulkFileIT.java       |  41 ++--
 .../accumulo/test/functional/BulkLoadIT.java       |  77 ++++++-
 12 files changed, 687 insertions(+), 96 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index 0970c17..4de0abe 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.rfile.RFile;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.client.summary.Summarizer;
 import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.security.Authorizations;
@@ -620,9 +621,19 @@ public interface TableOperations {
   /**
    * @since 2.0.0
    */
-  interface ImportSourceOptions {
-    ImportSourceOptions settingLogicalTime();
+  interface ImportOptions {
 
+    /**
+     * Use table's next timestamp to override all timestamps in imported files. The type of
+     * timestamp used depends on how the table was created.
+     *
+     * @see NewTableConfiguration#setTimeType(TimeType)
+     */
+    ImportDestinationOptions tableTime();
+
+    /**
+     * Loads the files into the table.
+     */
     void load()
         throws TableNotFoundException, IOException, AccumuloException, AccumuloSecurityException;
   }
@@ -630,28 +641,46 @@ public interface TableOperations {
   /**
    * @since 2.0.0
    */
-  interface ImportExecutorOptions extends ImportSourceOptions {
+  interface ImportDestinationOptions extends ImportOptions {
+
+    /**
+     * This is the default number of threads used to determine where to load files. A suffix of
+     * {@code C} means to multiply by the number of cores.
+     */
+    public static final String BULK_LOAD_THREADS_DEFAULT = "8C";
+
+    /**
+     * Load files in the directory to the row ranges specified in the plan. The plan should contain
+     * at least one entry for every file in the directory. When this option is specified, the files
+     * are never examined so it is possible to send files to the wrong tablet.
+     */
+    ImportOptions plan(LoadPlan service);
+
+    // The javadoc below intentionally used a fully qualified class name in the value tag, otherwise
+    // it would not render properly.
     /**
      * Files are examined to determine where to load them. This examination is done in the current
-     * process using multiple threads. If this property is not set, then the client property
-     * {@code bulk.threads} is used to create a thread pool.
+     * process using multiple threads. If this method is not called, then the client property
+     * {@code bulk.threads} is used to create a thread pool. This property defaults to
+     * {@value ImportDestinationOptions#BULK_LOAD_THREADS_DEFAULT}.
      *
      * @param service
      *          Use this executor to run file examination task
-     * @return ImportSourceOptions
      */
-    ImportSourceOptions usingExecutor(Executor service);
+    ImportOptions executor(Executor service);
 
+    // The javadoc below intentionally use a fully qualified class name in the value tag, otherwise
+    // it would not render properly.
     /**
      * Files are examined to determine where to load them. This examination is done in the current
-     * process using multiple threads. If this property is not set, then the client property
-     * {@code bulk.threads} is used to create a thread pool.
+     * process using multiple threads. If this method is not called, then the client property
+     * {@code bulk.threads} is used to create a thread pool. This property defaults to
+     * {@value org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions#BULK_LOAD_THREADS_DEFAULT}.
      *
      * @param numThreads
      *          Create a thread pool with this many thread to run file examination task.
-     * @return ImportSourceOptions
      */
-    ImportSourceOptions usingThreads(int numThreads);
+    ImportOptions threads(int numThreads);
   }
 
   /**
@@ -662,9 +691,8 @@ public interface TableOperations {
      *
      * @param directory
      *          Load files from this directory
-     * @return ImportSourceOptions
      */
-    ImportExecutorOptions from(String directory);
+    ImportDestinationOptions from(String directory);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
index 8db4acc..7e9544c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.core.client.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.groupingBy;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -26,6 +27,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -47,19 +50,25 @@ import org.apache.accumulo.core.client.NamespaceExistsException;
 import org.apache.accumulo.core.client.NamespaceNotFoundException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions;
+import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceArguments;
-import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceOptions;
+import org.apache.accumulo.core.client.impl.Bulk.FileInfo;
+import org.apache.accumulo.core.client.impl.Bulk.Files;
+import org.apache.accumulo.core.client.impl.Table.ID;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.LoadPlan.Destination;
+import org.apache.accumulo.core.data.LoadPlan.RangeType;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.master.thrift.FateOperation;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
@@ -71,8 +80,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
 
-public class BulkImport implements ImportSourceArguments, ImportExecutorOptions {
+public class BulkImport implements ImportSourceArguments, ImportDestinationOptions {
 
   private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
 
@@ -84,13 +96,15 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
   private final ClientContext context;
   private final String tableName;
 
+  private LoadPlan plan = null;
+
   BulkImport(String tableName, ClientContext context) {
     this.context = context;
     this.tableName = Objects.requireNonNull(tableName);
   }
 
   @Override
-  public ImportSourceOptions settingLogicalTime() {
+  public ImportDestinationOptions tableTime() {
     this.setTime = true;
     return this;
   }
@@ -109,34 +123,19 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
 
     Path srcPath = checkPath(fs, dir);
 
-    Executor executor;
-    ExecutorService service = null;
-
-    if (this.executor != null) {
-      executor = this.executor;
-    } else if (numThreads > 0) {
-      executor = service = Executors.newFixedThreadPool(numThreads);
+    SortedMap<KeyExtent,Bulk.Files> mappings;
+    if (plan == null) {
+      mappings = computeMappingFromFiles(fs, tableId, srcPath);
     } else {
-      String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey());
-      executor = service = Executors
-          .newFixedThreadPool(ConfigurationTypeHelper.getNumThreads(threads));
+      mappings = computeMappingFromPlan(fs, tableId, srcPath);
     }
 
-    try {
-      SortedMap<KeyExtent,Bulk.Files> mappings = computeFileToTabletMappings(fs, tableId, srcPath,
-          executor, context);
-
-      BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create);
+    BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create);
 
-      List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()),
-          ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)),
-          ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
-      doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), tableName);
-    } finally {
-      if (service != null) {
-        service.shutdown();
-      }
-    }
+    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()),
+        ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)),
+        ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
+    doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), tableName);
   }
 
   /**
@@ -164,17 +163,20 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
       throw new AccumuloException(
           "Bulk import directory " + dir + " does not exist or has bad permissions", fnf);
     }
+
+    // TODO ensure dir does not contain bulk load mapping
+
     return ret;
   }
 
   @Override
-  public ImportSourceOptions usingExecutor(Executor service) {
+  public ImportDestinationOptions executor(Executor service) {
     this.executor = Objects.requireNonNull(service);
     return this;
   }
 
   @Override
-  public ImportSourceOptions usingThreads(int numThreads) {
+  public ImportDestinationOptions threads(int numThreads) {
     Preconditions.checkArgument(numThreads > 0, "Non positive number of threads given : %s",
         numThreads);
     this.numThreads = numThreads;
@@ -182,7 +184,13 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
   }
 
   @Override
-  public ImportExecutorOptions from(String directory) {
+  public ImportDestinationOptions plan(LoadPlan plan) {
+    this.plan = plan;
+    return this;
+  }
+
+  @Override
+  public ImportDestinationOptions from(String directory) {
     this.dir = Objects.requireNonNull(directory);
     return this;
   }
@@ -198,7 +206,12 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
   }
 
   public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile,
-      long fileSize, Collection<KeyExtent> extents, FileSystem ns) throws IOException {
+      long fileSize, Collection<KeyExtent> extents, FileSystem ns, Cache<String,Long> fileLenCache)
+      throws IOException {
+
+    if (extents.size() == 1) {
+      return Collections.singletonMap(extents.iterator().next(), fileSize);
+    }
 
     long totalIndexEntries = 0;
     Map<KeyExtent,MLong> counts = new TreeMap<>();
@@ -208,7 +221,8 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
     Text row = new Text();
 
     FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder()
-        .forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
+        .forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
+        .withFileLenCache(fileLenCache).build();
 
     try {
       while (index.hasTop()) {
@@ -249,8 +263,12 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
   }
 
   public static List<KeyExtent> findOverlappingTablets(ClientContext context,
-      KeyExtentCache extentCache, Text startRow, Text endRow, FileSKVIterator reader)
+      KeyExtentCache extentCache, FileSKVIterator reader)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+
+    Text startRow = null;
+    Text endRow = null;
+
     List<KeyExtent> result = new ArrayList<>();
     Collection<ByteSequence> columnFamilies = Collections.emptyList();
     Text row = startRow;
@@ -269,8 +287,7 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
       result.add(extent);
       row = extent.getEndRow();
       if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
-        row = new Text(row);
-        row.append(byte0, 0, byte0.length);
+        row = nextRow(row);
       } else
         break;
     }
@@ -278,14 +295,156 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
     return result;
   }
 
+  private static Text nextRow(Text row) {
+    Text next = new Text(row);
+    next.append(byte0, 0, byte0.length);
+    return next;
+  }
+
   public static List<KeyExtent> findOverlappingTablets(ClientContext context,
-      KeyExtentCache extentCache, Path file, FileSystem fs)
+      KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
     try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
         .forFile(file.toString(), fs, fs.getConf())
-        .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
-      return findOverlappingTablets(context, extentCache, null, null, reader);
+        .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
+        .seekToBeginning().build()) {
+      return findOverlappingTablets(context, extentCache, reader);
+    }
+  }
+
+  private static Map<String,Long> getFileLenMap(FileStatus[] statuses) {
+    HashMap<String,Long> fileLens = new HashMap<>();
+    for (FileStatus status : statuses) {
+      fileLens.put(status.getPath().getName(), status.getLen());
+      status.getLen();
+    }
+
+    return fileLens;
+  }
+
+  private static Cache<String,Long> getPopulatedFileLenCache(Path dir, FileStatus[] statuses) {
+    Map<String,Long> fileLens = getFileLenMap(statuses);
+
+    Map<String,Long> absFileLens = new HashMap<>();
+    fileLens.forEach((k, v) -> {
+      absFileLens.put(CachableBlockFile.pathToCacheId(new Path(dir, k)), v);
+    });
+
+    Cache<String,Long> fileLenCache = CacheBuilder.newBuilder().build();
+
+    fileLenCache.putAll(absFileLens);
+
+    return fileLenCache;
+  }
+
+  private SortedMap<KeyExtent,Files> computeMappingFromPlan(FileSystem fs, ID tableId, Path srcPath)
+      throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+
+    Map<String,List<Destination>> fileDestinations = plan.getDestinations().stream()
+        .collect(groupingBy(Destination::getFileName));
+
+    FileStatus[] statuses = fs.listStatus(srcPath,
+        p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
+
+    Map<String,Long> fileLens = getFileLenMap(statuses);
+
+    if (!fileDestinations.keySet().equals(fileLens.keySet())) {
+      throw new IllegalArgumentException(
+          "Load plan files differ from directory files, symmetric difference : "
+              + Sets.symmetricDifference(fileDestinations.keySet(), fileLens.keySet()));
+    }
+
+    KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context);
+
+    // Pre-populate cache by looking up all end rows in sorted order. Doing this in sorted order
+    // leverages read ahead.
+    fileDestinations.values().stream().flatMap(List::stream)
+        .filter(dest -> dest.getRangeType() == RangeType.FILE)
+        .flatMap(dest -> Stream.of(dest.getStartRow(), dest.getEndRow())).filter(row -> row != null)
+        .map(Text::new).sorted().distinct().forEach(row -> {
+          try {
+            extentCache.lookup(row);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    SortedMap<KeyExtent,Files> mapping = new TreeMap<>();
+
+    for (Entry<String,List<Destination>> entry : fileDestinations.entrySet()) {
+      String fileName = entry.getKey();
+      List<Destination> destinations = entry.getValue();
+      Set<KeyExtent> extents = mapDesitnationsToExtents(tableId, extentCache, destinations);
+
+      long estSize = (long) (fileLens.get(fileName) / (double) extents.size());
+
+      for (KeyExtent keyExtent : extents) {
+        mapping.computeIfAbsent(keyExtent, k -> new Files())
+            .add(new FileInfo(fileName, estSize, 0));
+      }
+    }
+
+    return mergeOverlapping(mapping);
+  }
+
+  private Text toText(byte[] row) {
+    return row == null ? null : new Text(row);
+  }
+
+  private Set<KeyExtent> mapDesitnationsToExtents(Table.ID tableId, KeyExtentCache kec,
+      List<Destination> destinations)
+      throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Set<KeyExtent> extents = new HashSet<>();
+
+    for (Destination dest : destinations) {
+
+      if (dest.getRangeType() == RangeType.TABLE) {
+        extents.add(new KeyExtent(tableId, toText(dest.getEndRow()), toText(dest.getStartRow())));
+      } else if (dest.getRangeType() == RangeType.FILE) {
+        Text startRow = new Text(dest.getStartRow());
+        Text endRow = new Text(dest.getEndRow());
+
+        KeyExtent extent = kec.lookup(startRow);
+
+        extents.add(extent);
+
+        while (!extent.contains(endRow) && extent.getEndRow() != null) {
+          extent = kec.lookup(nextRow(extent.getEndRow()));
+          extents.add(extent);
+        }
+
+      } else {
+        throw new IllegalStateException();
+      }
     }
+
+    return extents;
+  }
+
+  private SortedMap<KeyExtent,Bulk.Files> computeMappingFromFiles(FileSystem fs, Table.ID tableId,
+      Path dirPath) throws IOException {
+
+    Executor executor;
+    ExecutorService service = null;
+
+    if (this.executor != null) {
+      executor = this.executor;
+    } else if (numThreads > 0) {
+      executor = service = Executors.newFixedThreadPool(numThreads);
+    } else {
+      String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey());
+      executor = service = Executors
+          .newFixedThreadPool(ConfigurationTypeHelper.getNumThreads(threads));
+    }
+
+    try {
+      return computeFileToTabletMappings(fs, tableId, dirPath, executor, context);
+    } finally {
+      if (service != null) {
+        service.shutdown();
+      }
+    }
+
   }
 
   public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs,
@@ -296,6 +455,10 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
     FileStatus[] files = fs.listStatus(dirPath,
         p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
 
+    // we know all of the file lens, so construct a cache and populate it in order to avoid later
+    // trips to the namenode
+    Cache<String,Long> fileLensCache = getPopulatedFileLenCache(dirPath, files);
+
     List<CompletableFuture<Map<KeyExtent,Bulk.FileInfo>>> futures = new ArrayList<>();
 
     for (FileStatus fileStatus : files) {
@@ -303,9 +466,9 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions
         try {
           long t1 = System.currentTimeMillis();
           List<KeyExtent> extents = findOverlappingTablets(context, extentCache,
-              fileStatus.getPath(), fs);
+              fileStatus.getPath(), fs, fileLensCache);
           Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(),
-              fileStatus.getPath(), fileStatus.getLen(), extents, fs);
+              fileStatus.getPath(), fileStatus.getLen(), extents, fs, fileLensCache);
           Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>();
           for (KeyExtent ke : extents) {
             pathLocations.put(ke,
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 0b47e7f..a5229f6 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -25,6 +25,7 @@ import java.util.Objects;
 import java.util.Properties;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.CredentialProviderToken;
 import org.apache.accumulo.core.client.security.tokens.DelegationToken;
@@ -70,7 +71,7 @@ public enum ClientProperty {
       "Number of concurrent query threads to spawn for querying"),
 
   // Bulk load
-  BULK_LOAD_THREADS("bulk.threads", "8C",
+  BULK_LOAD_THREADS("bulk.threads", ImportDestinationOptions.BULK_LOAD_THREADS_DEFAULT,
       "The number of threads used to inspect bulk load files to determine where files go.  "
           + "If the value ends with C, then it will be multiplied by the number of cores on the "
           + "system. This property is only used by the bulk import API introduced in 2.0.0."),
diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
new file mode 100644
index 0000000..19e4659
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.data;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedBytes;
+
+/**
+ * Information about where to load files into an Accumulo table.
+ *
+ * @see ImportDestinationOptions#plan(LoadPlan)
+ * @since 2.0.0
+ */
+public class LoadPlan {
+  private final ImmutableList<Destination> destinations;
+
+  private static byte[] copy(byte[] data) {
+    return data == null ? null : Arrays.copyOf(data, data.length);
+  }
+
+  private static byte[] copy(Text data) {
+    return data == null ? null : data.copyBytes();
+  }
+
+  private static byte[] copy(CharSequence data) {
+    return data == null ? null : data.toString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static String checkFileName(String fileName) {
+    Preconditions.checkArgument(Paths.get(fileName).getNameCount() == 1,
+        "Expected only filename, but got %s", fileName);
+    return fileName;
+  }
+
+  /**
+   * @since 2.0.0
+   */
+  public enum RangeType {
+    /**
+     * Range that correspond to one or more tablets in a table. For a range of this type the start
+     * row and end row can be null. The start row is exclusive and the end row is inclusive (like
+     * Accumulo tablets). A common use case for this would be when files were partitioned using a
+     * tables splits. When using this range type the start and end row must exist as splits in the
+     * table or an exception will be thrown at load time.
+     */
+    TABLE,
+    /**
+     * Range that correspond to known rows in a file. For this range type the start row and end row
+     * must be non-null. The start row and end row are both considered inclusive. At load time these
+     * data ranges will be mapped to tablet ranges.
+     */
+    FILE
+  }
+
+  /**
+   * Mapping of a file to a row range with an associated range type.
+   *
+   * @since 2.0.0
+   */
+  public static class Destination {
+
+    private final String fileName;
+    private final byte[] startRow;
+    private final byte[] endRow;
+    private final RangeType rangeType;
+
+    private byte[] checkRow(RangeType type, byte[] row) {
+      if (type == RangeType.FILE && row == null) {
+        throw new IllegalArgumentException(
+            "Row can not be null when range type is " + RangeType.FILE);
+      }
+      return row;
+    }
+
+    private Destination(String fileName, RangeType rangeType, byte[] startRow, byte[] endRow) {
+      this.fileName = checkFileName(fileName);
+      this.rangeType = rangeType;
+      this.startRow = checkRow(rangeType, startRow);
+      this.endRow = checkRow(rangeType, endRow);
+
+      if (rangeType == RangeType.FILE) {
+        if (UnsignedBytes.lexicographicalComparator().compare(startRow, endRow) > 0) {
+          String srs = new String(startRow, StandardCharsets.UTF_8);
+          String ers = new String(endRow, StandardCharsets.UTF_8);
+          throw new IllegalArgumentException(
+              "Start row is greater than end row : " + srs + " " + ers);
+        }
+      } else if (rangeType == RangeType.TABLE) {
+        if (startRow != null && endRow != null
+            && UnsignedBytes.lexicographicalComparator().compare(startRow, endRow) >= 0) {
+          String srs = new String(startRow, StandardCharsets.UTF_8);
+          String ers = new String(endRow, StandardCharsets.UTF_8);
+          throw new IllegalArgumentException(
+              "Start row is greater than or equal to end row : " + srs + " " + ers);
+        }
+      } else {
+        throw new RuntimeException();
+      }
+
+    }
+
+    public String getFileName() {
+      return fileName;
+    }
+
+    public byte[] getStartRow() {
+      return copy(startRow);
+    }
+
+    public byte[] getEndRow() {
+      return copy(endRow);
+    }
+
+    public RangeType getRangeType() {
+      return rangeType;
+    }
+  }
+
+  private LoadPlan(ImmutableList<Destination> destinations) {
+    this.destinations = destinations;
+  }
+
+  public Collection<Destination> getDestinations() {
+    return destinations;
+  }
+
+  /**
+   * @since 2.0.0
+   */
+  public interface Builder {
+    /**
+     * Specify the row range where a file should be loaded.
+     *
+     * @param fileName
+     *          this should not be a path. Only a file name because loads are expected to happen
+     *          from a single directory.
+     */
+    Builder loadFileTo(String fileName, RangeType rangeType, Text startRow, Text endRow);
+
+    /**
+     * Specify the row range where a file should be loaded.
+     *
+     * @param fileName
+     *          this should not be a path. Only a file name because loads are expected to happen
+     *          from a single directory.
+     */
+    Builder loadFileTo(String fileName, RangeType rangeType, byte[] startRow, byte[] endRow);
+
+    /**
+     * Specify the row range where a file should be loaded.
+     *
+     * @param fileName
+     *          this should not be a path. Only a file name because loads are expected to happen
+     *          from a single directory.
+     */
+    Builder loadFileTo(String fileName, RangeType rangeType, CharSequence startRow,
+        CharSequence endRow);
+
+    Builder addPlan(LoadPlan plan);
+
+    LoadPlan build();
+  }
+
+  public static Builder builder() {
+    return new Builder() {
+      ImmutableList.Builder<Destination> fmb = ImmutableList.builder();
+
+      @Override
+      public Builder loadFileTo(String fileName, RangeType rangeType, Text startRow, Text endRow) {
+        fmb.add(new Destination(fileName, rangeType, copy(startRow), copy(endRow)));
+        return this;
+      }
+
+      @Override
+      public Builder loadFileTo(String fileName, RangeType rangeType, byte[] startRow,
+          byte[] endRow) {
+        fmb.add(new Destination(fileName, rangeType, copy(startRow), copy(endRow)));
+        return this;
+      }
+
+      @Override
+      public Builder loadFileTo(String fileName, RangeType rangeType, CharSequence startRow,
+          CharSequence endRow) {
+        fmb.add(new Destination(fileName, rangeType, copy(startRow), copy(endRow)));
+        return this;
+      }
+
+      @Override
+      public Builder addPlan(LoadPlan plan) {
+        fmb.addAll(plan.getDestinations());
+        return this;
+      }
+
+      @Override
+      public LoadPlan build() {
+        return new LoadPlan(fmb.build());
+      }
+    };
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index cfa4d73..1ff0e22 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -317,9 +317,9 @@ public abstract class FileOperations {
           true);
     }
 
-    protected FileOptions toIndexReaderBuilderOptions() {
+    protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
       return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
-          false, null, null, null, false, cryptoService, null, null, true);
+          false, null, null, fileLenCache, false, cryptoService, null, null, true);
     }
 
     protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
@@ -484,6 +484,8 @@ public abstract class FileOperations {
    */
   public class IndexReaderBuilder extends FileHelper implements IndexReaderTableConfiguration {
 
+    private Cache<String,Long> fileLenCache = null;
+
     public IndexReaderTableConfiguration forFile(String filename, FileSystem fs,
         Configuration fsConf) {
       filename(filename).fs(fs).fsConf(fsConf);
@@ -496,8 +498,13 @@ public abstract class FileOperations {
       return this;
     }
 
+    public IndexReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
+      this.fileLenCache = fileLenCache;
+      return this;
+    }
+
     public FileSKVIterator build() throws IOException {
-      return openIndex(toIndexReaderBuilderOptions());
+      return openIndex(toIndexReaderBuilderOptions(fileLenCache));
     }
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index eaef448..b13ccbe 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -62,6 +62,10 @@ public class CachableBlockFile {
     T get() throws IOException;
   }
 
+  public static String pathToCacheId(Path p) {
+    return p.toString();
+  }
+
   /**
    * Class wraps the BCFile reader.
    */
@@ -325,8 +329,9 @@ public class CachableBlockFile {
         BlockCache data, BlockCache index, RateLimiter readLimiter,
         AccumuloConfiguration accumuloConfiguration, CryptoService cryptoService)
         throws IOException {
-      this(dataFile.toString(), () -> fs.open(dataFile), () -> fs.getFileStatus(dataFile).getLen(),
-          fileLenCache, data, index, readLimiter, conf, accumuloConfiguration, cryptoService);
+      this(pathToCacheId(dataFile), () -> fs.open(dataFile),
+          () -> fs.getFileStatus(dataFile).getLen(), fileLenCache, data, index, readLimiter, conf,
+          accumuloConfiguration, cryptoService);
     }
 
     public <InputStreamType extends InputStream & Seekable> Reader(String cacheId,
diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java
new file mode 100644
index 0000000..fd619d1
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.data;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.LoadPlan.Destination;
+import org.apache.accumulo.core.data.LoadPlan.RangeType;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LoadPlanTest {
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadRange1() {
+    LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, "a", "a").build();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadRange2() {
+    LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, "b", "a").build();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadRange3() {
+    LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, "b", "a").build();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadRange4() {
+    LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, null, "a").build();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadRange5() {
+    LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, "a", null).build();
+  }
+
+  @Test
+  public void testTypes() {
+    LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, "1112", "1145")
+        .loadFileTo("f2.rf", RangeType.FILE, "abc".getBytes(UTF_8), "def".getBytes(UTF_8))
+        .loadFileTo("f3.rf", RangeType.FILE, new Text("368"), new Text("479"))
+        .loadFileTo("f4.rf", RangeType.TABLE, null, "aaa")
+        .loadFileTo("f5.rf", RangeType.TABLE, "yyy", null)
+        .loadFileTo("f6.rf", RangeType.TABLE, null, "bbb".getBytes(UTF_8))
+        .loadFileTo("f7.rf", RangeType.TABLE, "www".getBytes(UTF_8), null)
+        .loadFileTo("f8.rf", RangeType.TABLE, null, new Text("ccc"))
+        .loadFileTo("f9.rf", RangeType.TABLE, new Text("xxx"), null)
+        .loadFileTo("fa.rf", RangeType.TABLE, "1138", "1147")
+        .loadFileTo("fb.rf", RangeType.TABLE, "heg".getBytes(UTF_8), "klt".getBytes(UTF_8))
+        .loadFileTo("fc.rf", RangeType.TABLE, new Text("agt"), new Text("ctt"))
+        .addPlan(
+            LoadPlan.builder().loadFileTo("fd.rf", RangeType.TABLE, (String) null, null).build())
+        .build();
+
+    Set<String> expected = new HashSet<>();
+    expected.add("f1.rf:DATA:1112:1145");
+    expected.add("f2.rf:DATA:abc:def");
+    expected.add("f3.rf:DATA:368:479");
+    expected.add("f4.rf:TABLET:null:aaa");
+    expected.add("f5.rf:TABLET:yyy:null");
+    expected.add("f6.rf:TABLET:null:bbb");
+    expected.add("f7.rf:TABLET:www:null");
+    expected.add("f8.rf:TABLET:null:ccc");
+    expected.add("f9.rf:TABLET:xxx:null");
+    expected.add("fa.rf:TABLET:1138:1147");
+    expected.add("fb.rf:TABLET:heg:klt");
+    expected.add("fc.rf:TABLET:agt:ctt");
+    expected.add("fd.rf:TABLET:null:null");
+
+    Set<String> actual = loadPlan.getDestinations().stream().map(LoadPlanTest::toString)
+        .collect(toSet());
+
+    Assert.assertEquals(expected, actual);
+
+  }
+
+  private static String toString(Destination d) {
+    return d.getFileName() + ":" + d.getRangeType() + ":" + toString(d.getStartRow()) + ":"
+        + toString(d.getEndRow());
+  }
+
+  private static String toString(byte[] r) {
+    return r == null ? null : new String(r, UTF_8);
+  }
+}
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index 947d401..9a7caa7 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -59,7 +59,7 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction;
 import org.apache.accumulo.core.client.admin.ActiveScan;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
-import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions;
+import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.client.impl.ClientConfConverter;
 import org.apache.accumulo.core.client.impl.Credentials;
@@ -1765,10 +1765,10 @@ public class ProxyServer implements AccumuloProxy.Iface {
       org.apache.accumulo.proxy.thrift.AccumuloException,
       org.apache.accumulo.proxy.thrift.AccumuloSecurityException, TException {
     try {
-      ImportExecutorOptions loader = getConnector(login).tableOperations().addFilesTo(tableName)
+      ImportDestinationOptions loader = getConnector(login).tableOperations().addFilesTo(tableName)
           .from(importDir);
       if (setTime) {
-        loader.settingLogicalTime().load();
+        loader.tableTime().load();
       } else {
         loader.load();
       }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index 21fc52f..817ba69 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -548,7 +548,7 @@ public class FileUtil {
       throws IOException {
 
     FileSystem ns = fs.getVolumeByPath(mapFile).getFileSystem();
-    return BulkImport.estimateSizes(acuConf, mapFile, fileSize, extents, ns);
+    return BulkImport.estimateSizes(acuConf, mapFile, fileSize, extents, ns, null);
   }
 
   public static Collection<String> toPathStrings(Collection<FileRef> refs) {
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java
index 0429794..9b01c76 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java
@@ -36,6 +36,7 @@ public class ImportDirectoryCommand extends Command {
         + "arguments: <directory> true|false";
   }
 
+  @SuppressWarnings("deprecation")
   @Override
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
       throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
@@ -48,10 +49,10 @@ public class ImportDirectoryCommand extends Command {
     // new bulk import only takes 2 args
     if (args.length == 2) {
       setTime = Boolean.parseBoolean(cl.getArgs()[1]);
-      TableOperations.ImportExecutorOptions bulk = shellState.getConnector().tableOperations()
+      TableOperations.ImportDestinationOptions bulk = shellState.getConnector().tableOperations()
           .addFilesTo(shellState.getTableName()).from(dir);
       if (setTime)
-        bulk.settingLogicalTime().load();
+        bulk.tableTime().load();
       else
         bulk.load();
     } else if (args.length == 3) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 05026eb..05ba141 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.test.functional;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.SortedSet;
@@ -73,23 +74,9 @@ public class BulkFileIT extends AccumuloClusterHarness {
 
     fs.delete(new Path(dir), true);
 
-    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer1.startDefaultLocalityGroup();
-    writeData(writer1, 0, 333);
-    writer1.close();
-
-    FileSKVWriter writer2 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f2." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer2.startDefaultLocalityGroup();
-    writeData(writer2, 334, 999);
-    writer2.close();
-
-    FileSKVWriter writer3 = FileOperations.getInstance().newWriterBuilder()
-        .forFile(dir + "/f3." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build();
-    writer3.startDefaultLocalityGroup();
-    writeData(writer3, 1000, 1999);
-    writer3.close();
+    writeData(conf, aconf, fs, dir, "f1", 0, 333);
+    writeData(conf, aconf, fs, dir, "f2", 334, 999);
+    writeData(conf, aconf, fs, dir, "f3", 1000, 1999);
 
     c.tableOperations().addFilesTo(tableName).from(dir).load();
 
@@ -99,6 +86,19 @@ public class BulkFileIT extends AccumuloClusterHarness {
 
   }
 
+  private void writeData(Configuration conf, AccumuloConfiguration aconf, FileSystem fs, String dir,
+      String file, int start, int end) throws IOException, Exception {
+    FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
+        .forFile(dir + "/" + file + "." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf)
+        .build();
+    writer1.startDefaultLocalityGroup();
+    for (int i = start; i <= end; i++) {
+      writer1.append(new Key(new Text(String.format("%04d", i))),
+          new Value(Integer.toString(i).getBytes(UTF_8)));
+    }
+    writer1.close();
+  }
+
   private void verifyData(String table, int s, int e) throws Exception {
     try (Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY)) {
 
@@ -124,11 +124,4 @@ public class BulkFileIT extends AccumuloClusterHarness {
     }
   }
 
-  private void writeData(FileSKVWriter w, int s, int e) throws Exception {
-    for (int i = s; i <= e; i++) {
-      w.append(new Key(new Text(String.format("%04d", i))),
-          new Value(Integer.toString(i).getBytes(UTF_8)));
-    }
-  }
-
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
index f7f4ec4..e5ccfcf 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
@@ -45,6 +45,8 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.LoadPlan.RangeType;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
@@ -62,6 +64,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -205,7 +208,7 @@ public class BulkLoadIT extends AccumuloClusterHarness {
     }
   }
 
-  private void testBulkFile(boolean offline) throws Exception {
+  private void testBulkFile(boolean offline, boolean usePlan) throws Exception {
     Connector c = getConnector();
     addSplits(tableName, "0333 0666 0999 1333 1666");
 
@@ -238,7 +241,15 @@ public class BulkLoadIT extends AccumuloClusterHarness {
     hashes.get("1666").add(h4);
     hashes.get("null").add(h4);
 
-    c.tableOperations().addFilesTo(tableName).from(dir).load();
+    if (usePlan) {
+      LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333))
+          .loadFileTo("f2.rf", RangeType.TABLE, row(333), row(999))
+          .loadFileTo("f3.rf", RangeType.FILE, row(1000), row(1499))
+          .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build();
+      c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load();
+    } else {
+      c.tableOperations().addFilesTo(tableName).from(dir).load();
+    }
 
     if (offline)
       c.tableOperations().online(tableName);
@@ -249,12 +260,63 @@ public class BulkLoadIT extends AccumuloClusterHarness {
 
   @Test
   public void testBulkFile() throws Exception {
-    testBulkFile(false);
+    testBulkFile(false, false);
   }
 
   @Test
   public void testBulkFileOffline() throws Exception {
-    testBulkFile(true);
+    testBulkFile(true, false);
+  }
+
+  @Test
+  public void testLoadPlan() throws Exception {
+    testBulkFile(false, true);
+  }
+
+  @Test
+  public void testLoadPlanOffline() throws Exception {
+    testBulkFile(true, true);
+  }
+
+  @Test
+  public void testBadLoadPlans() throws Exception {
+    Connector c = getConnector();
+    addSplits(tableName, "0333 0666 0999 1333 1666");
+
+    String dir = getDir("/testBulkFile-");
+
+    writeData(dir + "/f1.", aconf, 0, 333);
+    writeData(dir + "/f2.", aconf, 0, 666);
+
+    // Create a plan with more files than exists in dir
+    LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333))
+        .loadFileTo("f2.rf", RangeType.TABLE, null, row(666))
+        .loadFileTo("f3.rf", RangeType.TABLE, null, row(666)).build();
+    try {
+      c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      // ignore
+    }
+
+    // Create a plan with less files than exists in dir
+    loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)).build();
+    try {
+      c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      // ignore
+    }
+
+    // Create a plan with tablet boundary that does not exits
+    loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(555))
+        .loadFileTo("f2.rf", RangeType.TABLE, null, row(555)).build();
+    try {
+      c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load();
+      Assert.fail();
+    } catch (AccumuloException e) {
+      // ignore
+    }
   }
 
   private void addSplits(String tableName, String splitString) throws Exception {
@@ -325,6 +387,10 @@ public class BulkLoadIT extends AccumuloClusterHarness {
     }
   }
 
+  private static String row(int r) {
+    return String.format("%04d", r);
+  }
+
   private String writeData(String file, AccumuloConfiguration aconf, int s, int e)
       throws Exception {
     FileSystem fs = getCluster().getFileSystem();
@@ -333,8 +399,7 @@ public class BulkLoadIT extends AccumuloClusterHarness {
         .forFile(filename, fs, fs.getConf()).withTableConfiguration(aconf).build()) {
       writer.startDefaultLocalityGroup();
       for (int i = s; i <= e; i++) {
-        writer.append(new Key(new Text(String.format("%04d", i))),
-            new Value(Integer.toString(i).getBytes(UTF_8)));
+        writer.append(new Key(new Text(row(i))), new Value(Integer.toString(i).getBytes(UTF_8)));
       }
     }
 


Mime
View raw message