From commits-return-23899-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Sat Jun 6 00:59:01 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id EC099180665 for ; Sat, 6 Jun 2020 02:59:00 +0200 (CEST) Received: (qmail 95547 invoked by uid 500); 6 Jun 2020 00:59:00 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 95533 invoked by uid 99); 6 Jun 2020 00:58:59 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Jun 2020 00:58:59 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 94083814A0; Sat, 6 Jun 2020 00:58:59 +0000 (UTC) Date: Sat, 06 Jun 2020 00:58:59 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: Create max tablets property in new bulk import (#1614) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159140513945.6607.15799396608375490454@gitbox.apache.org> From: kturner@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 14dd12c3a97f44cd6c0273552ce8bc3b5439cd20 X-Git-Newrev: b1e67f7ced838037965436fc1d1f255d139a4045 X-Git-Rev: b1e67f7ced838037965436fc1d1f255d139a4045 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new b1e67f7 Create max tablets property in new bulk import (#1614) b1e67f7 is described below commit b1e67f7ced838037965436fc1d1f255d139a4045 Author: Mike Miller AuthorDate: Fri Jun 5 20:58:50 2020 -0400 Create max tablets property in new bulk import (#1614) --- .../accumulo/core/clientImpl/bulk/BulkImport.java | 52 +++++++++----- .../org/apache/accumulo/core/conf/Property.java | 3 + .../master/tableOps/bulkVer2/PrepBulkImport.java | 57 ++++++++++++---- .../tableOps/bulkVer2/PrepBulkImportTest.java | 31 ++++++++- .../apache/accumulo/test/functional/BulkNewIT.java | 79 ++++++++++++++++++++++ 5 files changed, 189 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index ebc3a3d..5120333 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -64,6 +64,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.crypto.CryptoServiceFactory; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -133,6 +134,14 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti SortedMap mappings; TableOperationsImpl tableOps = new TableOperationsImpl(context); + + int maxTablets = 0; + for (var prop : tableOps.getProperties(tableName)) { + if (prop.getKey().equals(Property.TABLE_BULK_MAX_TABLETS.getKey())) { + maxTablets = Integer.parseInt(prop.getValue()); + break; + } + } Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) .incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5) .logInterval(3, TimeUnit.MINUTES).createRetry(); @@ -141,9 +150,9 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti boolean shouldRetry = true; while (shouldRetry) { if (plan == null) { - mappings = computeMappingFromFiles(fs, tableId, srcPath); + mappings = computeMappingFromFiles(fs, tableId, srcPath, maxTablets); } else { - mappings = computeMappingFromPlan(fs, tableId, srcPath); + mappings = computeMappingFromPlan(fs, tableId, srcPath, maxTablets); } if (mappings.isEmpty()) @@ -385,7 +394,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } private SortedMap computeMappingFromPlan(FileSystem fs, TableId tableId, - Path srcPath) + Path srcPath, int maxTablets) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { Map> fileDestinations = @@ -422,7 +431,9 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti for (Entry> entry : fileDestinations.entrySet()) { String fileName = entry.getKey(); List destinations = entry.getValue(); - Set extents = mapDesitnationsToExtents(tableId, extentCache, destinations); + Set extents = mapDestinationsToExtents(tableId, extentCache, destinations); + log.debug("The file {} mapped to {} tablets.", fileName, extents.size()); + checkTabletCount(maxTablets, extents.size(), fileName); long estSize = (long) (fileLens.get(fileName) / (double) extents.size()); @@ -439,7 +450,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti return row == null ? null : new Text(row); } - private Set mapDesitnationsToExtents(TableId tableId, KeyExtentCache kec, + private Set mapDestinationsToExtents(TableId tableId, KeyExtentCache kec, List destinations) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Set extents = new HashSet<>(); @@ -470,7 +481,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } private SortedMap computeMappingFromFiles(FileSystem fs, TableId tableId, - Path dirPath) throws IOException { + Path dirPath, int maxTablets) throws IOException { Executor executor; ExecutorService service = null; @@ -486,7 +497,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } try { - return computeFileToTabletMappings(fs, tableId, dirPath, executor, context); + return computeFileToTabletMappings(fs, tableId, dirPath, executor, context, maxTablets); } finally { if (service != null) { service.shutdown(); @@ -523,8 +534,8 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti return fileList; } - public static SortedMap computeFileToTabletMappings(FileSystem fs, - TableId tableId, Path dirPath, Executor executor, ClientContext context) throws IOException { + public SortedMap computeFileToTabletMappings(FileSystem fs, TableId tableId, + Path dirPath, Executor executor, ClientContext context, int maxTablets) throws IOException { KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context); @@ -540,21 +551,22 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti CryptoService cs = CryptoServiceFactory.newDefaultInstance(); for (FileStatus fileStatus : files) { + Path filePath = fileStatus.getPath(); CompletableFuture> future = CompletableFuture.supplyAsync(() -> { try { long t1 = System.currentTimeMillis(); - List extents = findOverlappingTablets(context, extentCache, - fileStatus.getPath(), fs, fileLensCache, cs); - Map estSizes = estimateSizes(context.getConfiguration(), - fileStatus.getPath(), fileStatus.getLen(), extents, fs, fileLensCache, cs); + List extents = + findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs); + // make sure file isn't going to too many tablets + checkTabletCount(maxTablets, extents.size(), filePath.toString()); + Map estSizes = estimateSizes(context.getConfiguration(), filePath, + fileStatus.getLen(), extents, fs, fileLensCache, cs); Map pathLocations = new HashMap<>(); for (KeyExtent ke : extents) { - pathLocations.put(ke, - new Bulk.FileInfo(fileStatus.getPath(), estSizes.getOrDefault(ke, 0L))); + pathLocations.put(ke, new Bulk.FileInfo(filePath, estSizes.getOrDefault(ke, 0L))); } long t2 = System.currentTimeMillis(); - log.trace("Mapped {} to {} tablets in {}ms", fileStatus.getPath(), pathLocations.size(), - t2 - t1); + log.debug("Mapped {} to {} tablets in {}ms", filePath, pathLocations.size(), t2 - t1); return pathLocations; } catch (Exception e) { throw new CompletionException(e); @@ -611,4 +623,10 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti return mappings; } + + private void checkTabletCount(int tabletMaxSize, int tabletCount, String file) { + if (tabletMaxSize > 0 && tabletCount > tabletMaxSize) + throw new IllegalArgumentException("The file " + file + " attempted to import to " + + tabletCount + " tablets. Max tablets allowed set to " + tabletMaxSize); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index fc83376..494541c 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -715,6 +715,9 @@ public enum Property { + " perform specialized parsing of the key. "), TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING, "The bloom filter hash type"), + TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT, + "The maximum number of tablets allowed for one bulk import file. Value of 0 is Unlimited. " + + "This property is only enforced in the new bulk import API"), TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY, "The durability used to write to the write-ahead log. Legal values are:" + " none, which skips the write-ahead log; log, which sends the data to the" diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java index d949f21..7bc55af 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java @@ -38,10 +38,12 @@ import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize; import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.fate.FateTxId; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.master.Master; import org.apache.accumulo.master.tableOps.MasterRepo; @@ -59,7 +61,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterators; /** * Prepare bulk import directory. This REPO creates a bulk directory in Accumulo, list all the files @@ -108,18 +109,21 @@ public class PrepBulkImport extends MasterRepo { } @VisibleForTesting - static void checkForMerge(String tableId, Iterator lmi, - TabletIterFactory tabletIterFactory) throws Exception { - KeyExtent currRange = lmi.next(); + static void checkForMerge(String tableId, LoadMappingIterator lmi, + TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws Exception { + var currRange = lmi.next(); - Text startRow = currRange.getPrevEndRow(); + Text startRow = currRange.getKey().getPrevEndRow(); Iterator tabletIter = tabletIterFactory.newTabletIter(startRow); KeyExtent currTablet = tabletIter.next(); - if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, currRange) - && equals(KeyExtent::getEndRow, currTablet, currRange)) + var fileCounts = new HashMap(); + int count; + + if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey()) + && equals(KeyExtent::getEndRow, currTablet, currRange.getKey())) currRange = null; while (tabletIter.hasNext()) { @@ -131,20 +135,29 @@ public class PrepBulkImport extends MasterRepo { currRange = lmi.next(); } - while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange) && tabletIter.hasNext()) { + while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey()) + && tabletIter.hasNext()) { currTablet = tabletIter.next(); } - boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, currRange); + boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey()); + count = matchedPrevRow ? 1 : 0; - while (!equals(KeyExtent::getEndRow, currTablet, currRange) && tabletIter.hasNext()) { + while (!equals(KeyExtent::getEndRow, currTablet, currRange.getKey()) + && tabletIter.hasNext()) { currTablet = tabletIter.next(); + count++; } - if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, currRange)) { + if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, currRange.getKey())) { break; } + if (maxNumTablets > 0) { + int fc = count; + currRange.getValue() + .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, Integer::sum)); + } currRange = null; } @@ -153,12 +166,27 @@ public class PrepBulkImport extends MasterRepo { throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened"); } + + if (maxNumTablets > 0) { + fileCounts.values().removeIf(c -> c <= maxNumTablets); + if (!fileCounts.isEmpty()) { + log.warn("{} Bulk files overlapped too many tablets : {}", FateTxId.formatTid(tid), + fileCounts); + throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, + TableOperationExceptionType.OTHER, "Files overlap the configured max (" + maxNumTablets + + ") number of tablets: " + fileCounts.keySet()); + } + } } - private void checkForMerge(final Master master) throws Exception { + private void checkForMerge(final long tid, final Master master) throws Exception { VolumeManager fs = master.getVolumeManager(); final Path bulkDir = new Path(bulkInfo.sourceDir); + + int maxTablets = Integer.parseInt(master.getContext().getTableConfiguration(bulkInfo.tableId) + .get(Property.TABLE_BULK_MAX_TABLETS)); + try (LoadMappingIterator lmi = BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) { @@ -166,15 +194,14 @@ public class PrepBulkImport extends MasterRepo { .forTable(bulkInfo.tableId).overlapping(startRow, null).checkConsistency().fetch(PREV_ROW) .build(master.getContext()).stream().map(TabletMetadata::getExtent).iterator(); - checkForMerge(bulkInfo.tableId.canonical(), Iterators.transform(lmi, Map.Entry::getKey), - tabletIterFactory); + checkForMerge(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, maxTablets, tid); } } @Override public Repo call(final long tid, final Master master) throws Exception { // now that table lock is acquired check that all splits in load mapping exists in table - checkForMerge(master); + checkForMerge(tid, master); bulkInfo.tableState = Tables.getTableState(master.getContext(), bulkInfo.tableId); diff --git a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java index 88a0cc8..4d56526 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java @@ -21,6 +21,9 @@ package org.apache.accumulo.master.tableOps.bulkVer2; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -28,10 +31,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.bulk.Bulk; +import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize; +import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.master.tableOps.bulkVer2.PrepBulkImport.TabletIterFactory; @@ -97,7 +105,28 @@ public class PrepBulkImportTest { return tabletRanges.subList(start, tabletRanges.size()).iterator(); }; - PrepBulkImport.checkForMerge("1", loadRanges.iterator(), tabletIterFactory); + try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) { + PrepBulkImport.checkForMerge("1", lmi, tabletIterFactory, 100, 10001); + } + } + + private LoadMappingIterator createLoadMappingIter(List loadRanges) throws IOException { + SortedMap mapping = new TreeMap<>(); + Bulk.Files testFiles = new Bulk.Files(); + + long c = 0L; + for (String f : "f1 f2 f3".split(" ")) { + c++; + testFiles.add(new Bulk.FileInfo(f, c, c)); + } + for (KeyExtent ke : loadRanges) + mapping.put(ke, testFiles); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BulkSerialize.writeLoadMapping(mapping, "/some/dir", p -> baos); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + LoadMappingIterator lmi = + BulkSerialize.readLoadMapping("/some/dir", TableId.of("1"), p -> bais); + return lmi; } static String toRangeStrings(Collection extents) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index a4b2032..a2f925b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -22,6 +22,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -41,6 +42,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; @@ -50,6 +52,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.crypto.CryptoServiceFactory; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.LoadPlan; @@ -183,6 +186,32 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + @Test + public void testMaxTablets() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + tableName = "testMaxTablets_table1"; + NewTableConfiguration newTableConf = new NewTableConfiguration(); + // set logical time type so we can set time on bulk import + var props = Map.of(Property.TABLE_BULK_MAX_TABLETS.getKey(), "2"); + newTableConf.setProperties(props); + client.tableOperations().create(tableName, newTableConf); + + // test max tablets hit while inspecting bulk files + var thrown = assertThrows(RuntimeException.class, () -> testBulkFileMax(false)); + var c = thrown.getCause(); + assertTrue("Wrong exception: " + c, c instanceof ExecutionException); + assertTrue("Wrong exception: " + c.getCause(), + c.getCause() instanceof IllegalArgumentException); + var msg = c.getCause().getMessage(); + assertTrue("Bad File not in exception: " + msg, msg.contains("bad-file.rf")); + + // test max tablets hit using load plan on the server side + c = assertThrows(AccumuloException.class, () -> testBulkFileMax(true)); + msg = c.getMessage(); + assertTrue("Bad File not in exception: " + msg, msg.contains("bad-file.rf")); + } + } + private void testSingleTabletSingleFileNoSplits(AccumuloClient c, boolean offline) throws Exception { if (offline) { @@ -313,6 +342,56 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + private void testBulkFileMax(boolean usePlan) throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + addSplits(c, tableName, "0333 0666 0999 1333 1666"); + + String dir = getDir("/testBulkFileMax-"); + + Map> hashes = new HashMap<>(); + for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 null".split(" "))) { + hashes.put(endRow, new HashSet<>()); + } + + // Add a junk file, should be ignored + FSDataOutputStream out = fs.create(new Path(dir, "junk")); + out.writeChars("ABCDEFG\n"); + out.close(); + + // 1 Tablet 0333-null + String h1 = writeData(dir + "/f1.", aconf, 0, 333); + hashes.get("0333").add(h1); + + // 3 Tablets 0666-0334, 0999-0667, 1333-1000 + String h2 = writeData(dir + "/bad-file.", aconf, 334, 1333); + hashes.get("0666").add(h2); + hashes.get("0999").add(h2); + hashes.get("1333").add(h2); + + // 1 Tablet 1666-1334 + String h3 = writeData(dir + "/f3.", aconf, 1334, 1499); + hashes.get("1666").add(h3); + + // 2 Tablets 1666-1334, >1666 + String h4 = writeData(dir + "/f4.", aconf, 1500, 1999); + hashes.get("1666").add(h4); + hashes.get("null").add(h4); + + if (usePlan) { + LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)) + .loadFileTo("bad-file.rf", RangeType.TABLE, row(333), row(1333)) + .loadFileTo("f3.rf", RangeType.FILE, row(1334), row(1499)) + .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build(); + c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); + } else { + c.tableOperations().importDirectory(dir).to(tableName).load(); + } + + verifyData(c, tableName, 0, 1999, false); + verifyMetadata(c, tableName, hashes); + } + } + @Test public void testBulkFile() throws Exception { testBulkFile(false, false);