From commits-return-22103-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Fri Sep 14 00:02:59 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 766E5180649 for ; Fri, 14 Sep 2018 00:02:57 +0200 (CEST) Received: (qmail 84480 invoked by uid 500); 13 Sep 2018 22:02:56 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 84471 invoked by uid 99); 13 Sep 2018 22:02:56 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Sep 2018 22:02:56 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id C88F7807E9; Thu, 13 Sep 2018 22:02:55 +0000 (UTC) Date: Thu, 13 Sep 2018 22:02:55 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: ACCUMULO-4813 Add ability to provide a load plan for bulk import (#607) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153687617567.15659.11175075909648362183@gitbox.apache.org> From: kturner@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: a1b8837642111986c4f7fd4043d850532bd6e661 X-Git-Newrev: 7ef140ec40c3768859b848350db8c6d6d20f7a56 X-Git-Rev: 7ef140ec40c3768859b848350db8c6d6d20f7a56 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 7ef140e ACCUMULO-4813 Add ability to provide a load plan for bulk import (#607) 7ef140e is described below commit 7ef140ec40c3768859b848350db8c6d6d20f7a56 Author: Keith Turner AuthorDate: Thu Sep 13 18:00:40 2018 -0400 ACCUMULO-4813 Add ability to provide a load plan for bulk import (#607) --- .../core/client/admin/TableOperations.java | 54 +++-- .../accumulo/core/client/impl/BulkImport.java | 245 +++++++++++++++++---- .../apache/accumulo/core/conf/ClientProperty.java | 3 +- .../org/apache/accumulo/core/data/LoadPlan.java | 223 +++++++++++++++++++ .../apache/accumulo/core/file/FileOperations.java | 13 +- .../file/blockfile/impl/CachableBlockFile.java | 9 +- .../apache/accumulo/core/data/LoadPlanTest.java | 105 +++++++++ .../org/apache/accumulo/proxy/ProxyServer.java | 6 +- .../org/apache/accumulo/server/util/FileUtil.java | 2 +- .../shell/commands/ImportDirectoryCommand.java | 5 +- .../accumulo/test/functional/BulkFileIT.java | 41 ++-- .../accumulo/test/functional/BulkLoadIT.java | 77 ++++++- 12 files changed, 687 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java index 0970c17..4de0abe 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.client.summary.Summarizer; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.security.Authorizations; @@ -620,9 +621,19 @@ public interface TableOperations { /** * @since 2.0.0 */ - interface ImportSourceOptions { - ImportSourceOptions settingLogicalTime(); + interface ImportOptions { + /** + * Use table's next timestamp to override all timestamps in imported files. The type of + * timestamp used depends on how the table was created. + * + * @see NewTableConfiguration#setTimeType(TimeType) + */ + ImportDestinationOptions tableTime(); + + /** + * Loads the files into the table. + */ void load() throws TableNotFoundException, IOException, AccumuloException, AccumuloSecurityException; } @@ -630,28 +641,46 @@ public interface TableOperations { /** * @since 2.0.0 */ - interface ImportExecutorOptions extends ImportSourceOptions { + interface ImportDestinationOptions extends ImportOptions { + + /** + * This is the default number of threads used to determine where to load files. A suffix of + * {@code C} means to multiply by the number of cores. + */ + public static final String BULK_LOAD_THREADS_DEFAULT = "8C"; + + /** + * Load files in the directory to the row ranges specified in the plan. The plan should contain + * at least one entry for every file in the directory. When this option is specified, the files + * are never examined so it is possible to send files to the wrong tablet. + */ + ImportOptions plan(LoadPlan service); + + // The javadoc below intentionally used a fully qualified class name in the value tag, otherwise + // it would not render properly. /** * Files are examined to determine where to load them. This examination is done in the current - * process using multiple threads. If this property is not set, then the client property - * {@code bulk.threads} is used to create a thread pool. + * process using multiple threads. If this method is not called, then the client property + * {@code bulk.threads} is used to create a thread pool. This property defaults to + * {@value ImportDestinationOptions#BULK_LOAD_THREADS_DEFAULT}. * * @param service * Use this executor to run file examination task - * @return ImportSourceOptions */ - ImportSourceOptions usingExecutor(Executor service); + ImportOptions executor(Executor service); + // The javadoc below intentionally use a fully qualified class name in the value tag, otherwise + // it would not render properly. /** * Files are examined to determine where to load them. This examination is done in the current - * process using multiple threads. If this property is not set, then the client property - * {@code bulk.threads} is used to create a thread pool. + * process using multiple threads. If this method is not called, then the client property + * {@code bulk.threads} is used to create a thread pool. This property defaults to + * {@value org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions#BULK_LOAD_THREADS_DEFAULT}. * * @param numThreads * Create a thread pool with this many thread to run file examination task. - * @return ImportSourceOptions */ - ImportSourceOptions usingThreads(int numThreads); + ImportOptions threads(int numThreads); } /** @@ -662,9 +691,8 @@ public interface TableOperations { * * @param directory * Load files from this directory - * @return ImportSourceOptions */ - ImportExecutorOptions from(String directory); + ImportDestinationOptions from(String directory); } /** diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java index 8db4acc..7e9544c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/BulkImport.java @@ -17,6 +17,7 @@ package org.apache.accumulo.core.client.impl; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.groupingBy; import java.io.FileNotFoundException; import java.io.IOException; @@ -26,6 +27,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Stream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -47,19 +50,25 @@ import org.apache.accumulo.core.client.NamespaceExistsException; import org.apache.accumulo.core.client.NamespaceNotFoundException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions; +import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions; import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceArguments; -import org.apache.accumulo.core.client.admin.TableOperations.ImportSourceOptions; +import org.apache.accumulo.core.client.impl.Bulk.FileInfo; +import org.apache.accumulo.core.client.impl.Bulk.Files; +import org.apache.accumulo.core.client.impl.Table.ID; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlan.Destination; +import org.apache.accumulo.core.data.LoadPlan.RangeType; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.master.thrift.FateOperation; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.volume.VolumeConfiguration; @@ -71,8 +80,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; -public class BulkImport implements ImportSourceArguments, ImportExecutorOptions { +public class BulkImport implements ImportSourceArguments, ImportDestinationOptions { private static final Logger log = LoggerFactory.getLogger(BulkImport.class); @@ -84,13 +96,15 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions private final ClientContext context; private final String tableName; + private LoadPlan plan = null; + BulkImport(String tableName, ClientContext context) { this.context = context; this.tableName = Objects.requireNonNull(tableName); } @Override - public ImportSourceOptions settingLogicalTime() { + public ImportDestinationOptions tableTime() { this.setTime = true; return this; } @@ -109,34 +123,19 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions Path srcPath = checkPath(fs, dir); - Executor executor; - ExecutorService service = null; - - if (this.executor != null) { - executor = this.executor; - } else if (numThreads > 0) { - executor = service = Executors.newFixedThreadPool(numThreads); + SortedMap mappings; + if (plan == null) { + mappings = computeMappingFromFiles(fs, tableId, srcPath); } else { - String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey()); - executor = service = Executors - .newFixedThreadPool(ConfigurationTypeHelper.getNumThreads(threads)); + mappings = computeMappingFromPlan(fs, tableId, srcPath); } - try { - SortedMap mappings = computeFileToTabletMappings(fs, tableId, srcPath, - executor, context); - - BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create); + BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create); - List args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()), - ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)), - ByteBuffer.wrap((setTime + "").getBytes(UTF_8))); - doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), tableName); - } finally { - if (service != null) { - service.shutdown(); - } - } + List args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()), + ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)), + ByteBuffer.wrap((setTime + "").getBytes(UTF_8))); + doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), tableName); } /** @@ -164,17 +163,20 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions throw new AccumuloException( "Bulk import directory " + dir + " does not exist or has bad permissions", fnf); } + + // TODO ensure dir does not contain bulk load mapping + return ret; } @Override - public ImportSourceOptions usingExecutor(Executor service) { + public ImportDestinationOptions executor(Executor service) { this.executor = Objects.requireNonNull(service); return this; } @Override - public ImportSourceOptions usingThreads(int numThreads) { + public ImportDestinationOptions threads(int numThreads) { Preconditions.checkArgument(numThreads > 0, "Non positive number of threads given : %s", numThreads); this.numThreads = numThreads; @@ -182,7 +184,13 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions } @Override - public ImportExecutorOptions from(String directory) { + public ImportDestinationOptions plan(LoadPlan plan) { + this.plan = plan; + return this; + } + + @Override + public ImportDestinationOptions from(String directory) { this.dir = Objects.requireNonNull(directory); return this; } @@ -198,7 +206,12 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions } public static Map estimateSizes(AccumuloConfiguration acuConf, Path mapFile, - long fileSize, Collection extents, FileSystem ns) throws IOException { + long fileSize, Collection extents, FileSystem ns, Cache fileLenCache) + throws IOException { + + if (extents.size() == 1) { + return Collections.singletonMap(extents.iterator().next(), fileSize); + } long totalIndexEntries = 0; Map counts = new TreeMap<>(); @@ -208,7 +221,8 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions Text row = new Text(); FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build(); + .forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf) + .withFileLenCache(fileLenCache).build(); try { while (index.hasTop()) { @@ -249,8 +263,12 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions } public static List findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, Text startRow, Text endRow, FileSKVIterator reader) + KeyExtentCache extentCache, FileSKVIterator reader) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + + Text startRow = null; + Text endRow = null; + List result = new ArrayList<>(); Collection columnFamilies = Collections.emptyList(); Text row = startRow; @@ -269,8 +287,7 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions result.add(extent); row = extent.getEndRow(); if (row != null && (endRow == null || row.compareTo(endRow) < 0)) { - row = new Text(row); - row.append(byte0, 0, byte0.length); + row = nextRow(row); } else break; } @@ -278,14 +295,156 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions return result; } + private static Text nextRow(Text row) { + Text next = new Text(row); + next.append(byte0, 0, byte0.length); + return next; + } + public static List findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, Path file, FileSystem fs) + KeyExtentCache extentCache, Path file, FileSystem fs, Cache fileLenCache) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file.toString(), fs, fs.getConf()) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { - return findOverlappingTablets(context, extentCache, null, null, reader); + .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) + .seekToBeginning().build()) { + return findOverlappingTablets(context, extentCache, reader); + } + } + + private static Map getFileLenMap(FileStatus[] statuses) { + HashMap fileLens = new HashMap<>(); + for (FileStatus status : statuses) { + fileLens.put(status.getPath().getName(), status.getLen()); + status.getLen(); + } + + return fileLens; + } + + private static Cache getPopulatedFileLenCache(Path dir, FileStatus[] statuses) { + Map fileLens = getFileLenMap(statuses); + + Map absFileLens = new HashMap<>(); + fileLens.forEach((k, v) -> { + absFileLens.put(CachableBlockFile.pathToCacheId(new Path(dir, k)), v); + }); + + Cache fileLenCache = CacheBuilder.newBuilder().build(); + + fileLenCache.putAll(absFileLens); + + return fileLenCache; + } + + private SortedMap computeMappingFromPlan(FileSystem fs, ID tableId, Path srcPath) + throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + + Map> fileDestinations = plan.getDestinations().stream() + .collect(groupingBy(Destination::getFileName)); + + FileStatus[] statuses = fs.listStatus(srcPath, + p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); + + Map fileLens = getFileLenMap(statuses); + + if (!fileDestinations.keySet().equals(fileLens.keySet())) { + throw new IllegalArgumentException( + "Load plan files differ from directory files, symmetric difference : " + + Sets.symmetricDifference(fileDestinations.keySet(), fileLens.keySet())); + } + + KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context); + + // Pre-populate cache by looking up all end rows in sorted order. Doing this in sorted order + // leverages read ahead. + fileDestinations.values().stream().flatMap(List::stream) + .filter(dest -> dest.getRangeType() == RangeType.FILE) + .flatMap(dest -> Stream.of(dest.getStartRow(), dest.getEndRow())).filter(row -> row != null) + .map(Text::new).sorted().distinct().forEach(row -> { + try { + extentCache.lookup(row); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + SortedMap mapping = new TreeMap<>(); + + for (Entry> entry : fileDestinations.entrySet()) { + String fileName = entry.getKey(); + List destinations = entry.getValue(); + Set extents = mapDesitnationsToExtents(tableId, extentCache, destinations); + + long estSize = (long) (fileLens.get(fileName) / (double) extents.size()); + + for (KeyExtent keyExtent : extents) { + mapping.computeIfAbsent(keyExtent, k -> new Files()) + .add(new FileInfo(fileName, estSize, 0)); + } + } + + return mergeOverlapping(mapping); + } + + private Text toText(byte[] row) { + return row == null ? null : new Text(row); + } + + private Set mapDesitnationsToExtents(Table.ID tableId, KeyExtentCache kec, + List destinations) + throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + Set extents = new HashSet<>(); + + for (Destination dest : destinations) { + + if (dest.getRangeType() == RangeType.TABLE) { + extents.add(new KeyExtent(tableId, toText(dest.getEndRow()), toText(dest.getStartRow()))); + } else if (dest.getRangeType() == RangeType.FILE) { + Text startRow = new Text(dest.getStartRow()); + Text endRow = new Text(dest.getEndRow()); + + KeyExtent extent = kec.lookup(startRow); + + extents.add(extent); + + while (!extent.contains(endRow) && extent.getEndRow() != null) { + extent = kec.lookup(nextRow(extent.getEndRow())); + extents.add(extent); + } + + } else { + throw new IllegalStateException(); + } } + + return extents; + } + + private SortedMap computeMappingFromFiles(FileSystem fs, Table.ID tableId, + Path dirPath) throws IOException { + + Executor executor; + ExecutorService service = null; + + if (this.executor != null) { + executor = this.executor; + } else if (numThreads > 0) { + executor = service = Executors.newFixedThreadPool(numThreads); + } else { + String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey()); + executor = service = Executors + .newFixedThreadPool(ConfigurationTypeHelper.getNumThreads(threads)); + } + + try { + return computeFileToTabletMappings(fs, tableId, dirPath, executor, context); + } finally { + if (service != null) { + service.shutdown(); + } + } + } public static SortedMap computeFileToTabletMappings(FileSystem fs, @@ -296,6 +455,10 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions FileStatus[] files = fs.listStatus(dirPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); + // we know all of the file lens, so construct a cache and populate it in order to avoid later + // trips to the namenode + Cache fileLensCache = getPopulatedFileLenCache(dirPath, files); + List>> futures = new ArrayList<>(); for (FileStatus fileStatus : files) { @@ -303,9 +466,9 @@ public class BulkImport implements ImportSourceArguments, ImportExecutorOptions try { long t1 = System.currentTimeMillis(); List extents = findOverlappingTablets(context, extentCache, - fileStatus.getPath(), fs); + fileStatus.getPath(), fs, fileLensCache); Map estSizes = estimateSizes(context.getConfiguration(), - fileStatus.getPath(), fileStatus.getLen(), extents, fs); + fileStatus.getPath(), fileStatus.getLen(), extents, fs, fileLensCache); Map pathLocations = new HashMap<>(); for (KeyExtent ke : extents) { pathLocations.put(ke, diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java index 0b47e7f..a5229f6 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.Properties; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.CredentialProviderToken; import org.apache.accumulo.core.client.security.tokens.DelegationToken; @@ -70,7 +71,7 @@ public enum ClientProperty { "Number of concurrent query threads to spawn for querying"), // Bulk load - BULK_LOAD_THREADS("bulk.threads", "8C", + BULK_LOAD_THREADS("bulk.threads", ImportDestinationOptions.BULK_LOAD_THREADS_DEFAULT, "The number of threads used to inspect bulk load files to determine where files go. " + "If the value ends with C, then it will be multiplied by the number of cores on the " + "system. This property is only used by the bulk import API introduced in 2.0.0."), diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java new file mode 100644 index 0000000..19e4659 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.data; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.UnsignedBytes; + +/** + * Information about where to load files into an Accumulo table. + * + * @see ImportDestinationOptions#plan(LoadPlan) + * @since 2.0.0 + */ +public class LoadPlan { + private final ImmutableList destinations; + + private static byte[] copy(byte[] data) { + return data == null ? null : Arrays.copyOf(data, data.length); + } + + private static byte[] copy(Text data) { + return data == null ? null : data.copyBytes(); + } + + private static byte[] copy(CharSequence data) { + return data == null ? null : data.toString().getBytes(StandardCharsets.UTF_8); + } + + private static String checkFileName(String fileName) { + Preconditions.checkArgument(Paths.get(fileName).getNameCount() == 1, + "Expected only filename, but got %s", fileName); + return fileName; + } + + /** + * @since 2.0.0 + */ + public enum RangeType { + /** + * Range that correspond to one or more tablets in a table. For a range of this type the start + * row and end row can be null. The start row is exclusive and the end row is inclusive (like + * Accumulo tablets). A common use case for this would be when files were partitioned using a + * tables splits. When using this range type the start and end row must exist as splits in the + * table or an exception will be thrown at load time. + */ + TABLE, + /** + * Range that correspond to known rows in a file. For this range type the start row and end row + * must be non-null. The start row and end row are both considered inclusive. At load time these + * data ranges will be mapped to tablet ranges. + */ + FILE + } + + /** + * Mapping of a file to a row range with an associated range type. + * + * @since 2.0.0 + */ + public static class Destination { + + private final String fileName; + private final byte[] startRow; + private final byte[] endRow; + private final RangeType rangeType; + + private byte[] checkRow(RangeType type, byte[] row) { + if (type == RangeType.FILE && row == null) { + throw new IllegalArgumentException( + "Row can not be null when range type is " + RangeType.FILE); + } + return row; + } + + private Destination(String fileName, RangeType rangeType, byte[] startRow, byte[] endRow) { + this.fileName = checkFileName(fileName); + this.rangeType = rangeType; + this.startRow = checkRow(rangeType, startRow); + this.endRow = checkRow(rangeType, endRow); + + if (rangeType == RangeType.FILE) { + if (UnsignedBytes.lexicographicalComparator().compare(startRow, endRow) > 0) { + String srs = new String(startRow, StandardCharsets.UTF_8); + String ers = new String(endRow, StandardCharsets.UTF_8); + throw new IllegalArgumentException( + "Start row is greater than end row : " + srs + " " + ers); + } + } else if (rangeType == RangeType.TABLE) { + if (startRow != null && endRow != null + && UnsignedBytes.lexicographicalComparator().compare(startRow, endRow) >= 0) { + String srs = new String(startRow, StandardCharsets.UTF_8); + String ers = new String(endRow, StandardCharsets.UTF_8); + throw new IllegalArgumentException( + "Start row is greater than or equal to end row : " + srs + " " + ers); + } + } else { + throw new RuntimeException(); + } + + } + + public String getFileName() { + return fileName; + } + + public byte[] getStartRow() { + return copy(startRow); + } + + public byte[] getEndRow() { + return copy(endRow); + } + + public RangeType getRangeType() { + return rangeType; + } + } + + private LoadPlan(ImmutableList destinations) { + this.destinations = destinations; + } + + public Collection getDestinations() { + return destinations; + } + + /** + * @since 2.0.0 + */ + public interface Builder { + /** + * Specify the row range where a file should be loaded. + * + * @param fileName + * this should not be a path. Only a file name because loads are expected to happen + * from a single directory. + */ + Builder loadFileTo(String fileName, RangeType rangeType, Text startRow, Text endRow); + + /** + * Specify the row range where a file should be loaded. + * + * @param fileName + * this should not be a path. Only a file name because loads are expected to happen + * from a single directory. + */ + Builder loadFileTo(String fileName, RangeType rangeType, byte[] startRow, byte[] endRow); + + /** + * Specify the row range where a file should be loaded. + * + * @param fileName + * this should not be a path. Only a file name because loads are expected to happen + * from a single directory. + */ + Builder loadFileTo(String fileName, RangeType rangeType, CharSequence startRow, + CharSequence endRow); + + Builder addPlan(LoadPlan plan); + + LoadPlan build(); + } + + public static Builder builder() { + return new Builder() { + ImmutableList.Builder fmb = ImmutableList.builder(); + + @Override + public Builder loadFileTo(String fileName, RangeType rangeType, Text startRow, Text endRow) { + fmb.add(new Destination(fileName, rangeType, copy(startRow), copy(endRow))); + return this; + } + + @Override + public Builder loadFileTo(String fileName, RangeType rangeType, byte[] startRow, + byte[] endRow) { + fmb.add(new Destination(fileName, rangeType, copy(startRow), copy(endRow))); + return this; + } + + @Override + public Builder loadFileTo(String fileName, RangeType rangeType, CharSequence startRow, + CharSequence endRow) { + fmb.add(new Destination(fileName, rangeType, copy(startRow), copy(endRow))); + return this; + } + + @Override + public Builder addPlan(LoadPlan plan) { + fmb.addAll(plan.getDestinations()); + return this; + } + + @Override + public LoadPlan build() { + return new LoadPlan(fmb.build()); + } + }; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index cfa4d73..1ff0e22 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -317,9 +317,9 @@ public abstract class FileOperations { true); } - protected FileOptions toIndexReaderBuilderOptions() { + protected FileOptions toIndexReaderBuilderOptions(Cache fileLenCache) { return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, - false, null, null, null, false, cryptoService, null, null, true); + false, null, null, fileLenCache, false, cryptoService, null, null, true); } protected FileOptions toScanReaderBuilderOptions(Range range, Set columnFamilies, @@ -484,6 +484,8 @@ public abstract class FileOperations { */ public class IndexReaderBuilder extends FileHelper implements IndexReaderTableConfiguration { + private Cache fileLenCache = null; + public IndexReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) { filename(filename).fs(fs).fsConf(fsConf); @@ -496,8 +498,13 @@ public abstract class FileOperations { return this; } + public IndexReaderBuilder withFileLenCache(Cache fileLenCache) { + this.fileLenCache = fileLenCache; + return this; + } + public FileSKVIterator build() throws IOException { - return openIndex(toIndexReaderBuilderOptions()); + return openIndex(toIndexReaderBuilderOptions(fileLenCache)); } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index eaef448..b13ccbe 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -62,6 +62,10 @@ public class CachableBlockFile { T get() throws IOException; } + public static String pathToCacheId(Path p) { + return p.toString(); + } + /** * Class wraps the BCFile reader. */ @@ -325,8 +329,9 @@ public class CachableBlockFile { BlockCache data, BlockCache index, RateLimiter readLimiter, AccumuloConfiguration accumuloConfiguration, CryptoService cryptoService) throws IOException { - this(dataFile.toString(), () -> fs.open(dataFile), () -> fs.getFileStatus(dataFile).getLen(), - fileLenCache, data, index, readLimiter, conf, accumuloConfiguration, cryptoService); + this(pathToCacheId(dataFile), () -> fs.open(dataFile), + () -> fs.getFileStatus(dataFile).getLen(), fileLenCache, data, index, readLimiter, conf, + accumuloConfiguration, cryptoService); } public Reader(String cacheId, diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java new file mode 100644 index 0000000..fd619d1 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.data; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.toSet; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.data.LoadPlan.Destination; +import org.apache.accumulo.core.data.LoadPlan.RangeType; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +public class LoadPlanTest { + @Test(expected = IllegalArgumentException.class) + public void testBadRange1() { + LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, "a", "a").build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadRange2() { + LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, "b", "a").build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadRange3() { + LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, "b", "a").build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadRange4() { + LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, null, "a").build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadRange5() { + LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, "a", null).build(); + } + + @Test + public void testTypes() { + LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.FILE, "1112", "1145") + .loadFileTo("f2.rf", RangeType.FILE, "abc".getBytes(UTF_8), "def".getBytes(UTF_8)) + .loadFileTo("f3.rf", RangeType.FILE, new Text("368"), new Text("479")) + .loadFileTo("f4.rf", RangeType.TABLE, null, "aaa") + .loadFileTo("f5.rf", RangeType.TABLE, "yyy", null) + .loadFileTo("f6.rf", RangeType.TABLE, null, "bbb".getBytes(UTF_8)) + .loadFileTo("f7.rf", RangeType.TABLE, "www".getBytes(UTF_8), null) + .loadFileTo("f8.rf", RangeType.TABLE, null, new Text("ccc")) + .loadFileTo("f9.rf", RangeType.TABLE, new Text("xxx"), null) + .loadFileTo("fa.rf", RangeType.TABLE, "1138", "1147") + .loadFileTo("fb.rf", RangeType.TABLE, "heg".getBytes(UTF_8), "klt".getBytes(UTF_8)) + .loadFileTo("fc.rf", RangeType.TABLE, new Text("agt"), new Text("ctt")) + .addPlan( + LoadPlan.builder().loadFileTo("fd.rf", RangeType.TABLE, (String) null, null).build()) + .build(); + + Set expected = new HashSet<>(); + expected.add("f1.rf:DATA:1112:1145"); + expected.add("f2.rf:DATA:abc:def"); + expected.add("f3.rf:DATA:368:479"); + expected.add("f4.rf:TABLET:null:aaa"); + expected.add("f5.rf:TABLET:yyy:null"); + expected.add("f6.rf:TABLET:null:bbb"); + expected.add("f7.rf:TABLET:www:null"); + expected.add("f8.rf:TABLET:null:ccc"); + expected.add("f9.rf:TABLET:xxx:null"); + expected.add("fa.rf:TABLET:1138:1147"); + expected.add("fb.rf:TABLET:heg:klt"); + expected.add("fc.rf:TABLET:agt:ctt"); + expected.add("fd.rf:TABLET:null:null"); + + Set actual = loadPlan.getDestinations().stream().map(LoadPlanTest::toString) + .collect(toSet()); + + Assert.assertEquals(expected, actual); + + } + + private static String toString(Destination d) { + return d.getFileName() + ":" + d.getRangeType() + ":" + toString(d.getStartRow()) + ":" + + toString(d.getEndRow()); + } + + private static String toString(byte[] r) { + return r == null ? null : new String(r, UTF_8); + } +} diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java index 947d401..9a7caa7 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java @@ -59,7 +59,7 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions; +import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationOptions; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.impl.ClientConfConverter; import org.apache.accumulo.core.client.impl.Credentials; @@ -1765,10 +1765,10 @@ public class ProxyServer implements AccumuloProxy.Iface { org.apache.accumulo.proxy.thrift.AccumuloException, org.apache.accumulo.proxy.thrift.AccumuloSecurityException, TException { try { - ImportExecutorOptions loader = getConnector(login).tableOperations().addFilesTo(tableName) + ImportDestinationOptions loader = getConnector(login).tableOperations().addFilesTo(tableName) .from(importDir); if (setTime) { - loader.settingLogicalTime().load(); + loader.tableTime().load(); } else { loader.load(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java index 21fc52f..817ba69 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java @@ -548,7 +548,7 @@ public class FileUtil { throws IOException { FileSystem ns = fs.getVolumeByPath(mapFile).getFileSystem(); - return BulkImport.estimateSizes(acuConf, mapFile, fileSize, extents, ns); + return BulkImport.estimateSizes(acuConf, mapFile, fileSize, extents, ns, null); } public static Collection toPathStrings(Collection refs) { diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java index 0429794..9b01c76 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ImportDirectoryCommand.java @@ -36,6 +36,7 @@ public class ImportDirectoryCommand extends Command { + "arguments: true|false"; } + @SuppressWarnings("deprecation") @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -48,10 +49,10 @@ public class ImportDirectoryCommand extends Command { // new bulk import only takes 2 args if (args.length == 2) { setTime = Boolean.parseBoolean(cl.getArgs()[1]); - TableOperations.ImportExecutorOptions bulk = shellState.getConnector().tableOperations() + TableOperations.ImportDestinationOptions bulk = shellState.getConnector().tableOperations() .addFilesTo(shellState.getTableName()).from(dir); if (setTime) - bulk.settingLogicalTime().load(); + bulk.tableTime().load(); else bulk.load(); } else if (args.length == 3) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java index 05026eb..05ba141 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java @@ -18,6 +18,7 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; import java.util.Iterator; import java.util.Map.Entry; import java.util.SortedSet; @@ -73,23 +74,9 @@ public class BulkFileIT extends AccumuloClusterHarness { fs.delete(new Path(dir), true); - FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder() - .forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build(); - writer1.startDefaultLocalityGroup(); - writeData(writer1, 0, 333); - writer1.close(); - - FileSKVWriter writer2 = FileOperations.getInstance().newWriterBuilder() - .forFile(dir + "/f2." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build(); - writer2.startDefaultLocalityGroup(); - writeData(writer2, 334, 999); - writer2.close(); - - FileSKVWriter writer3 = FileOperations.getInstance().newWriterBuilder() - .forFile(dir + "/f3." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf).build(); - writer3.startDefaultLocalityGroup(); - writeData(writer3, 1000, 1999); - writer3.close(); + writeData(conf, aconf, fs, dir, "f1", 0, 333); + writeData(conf, aconf, fs, dir, "f2", 334, 999); + writeData(conf, aconf, fs, dir, "f3", 1000, 1999); c.tableOperations().addFilesTo(tableName).from(dir).load(); @@ -99,6 +86,19 @@ public class BulkFileIT extends AccumuloClusterHarness { } + private void writeData(Configuration conf, AccumuloConfiguration aconf, FileSystem fs, String dir, + String file, int start, int end) throws IOException, Exception { + FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder() + .forFile(dir + "/" + file + "." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf) + .build(); + writer1.startDefaultLocalityGroup(); + for (int i = start; i <= end; i++) { + writer1.append(new Key(new Text(String.format("%04d", i))), + new Value(Integer.toString(i).getBytes(UTF_8))); + } + writer1.close(); + } + private void verifyData(String table, int s, int e) throws Exception { try (Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY)) { @@ -124,11 +124,4 @@ public class BulkFileIT extends AccumuloClusterHarness { } } - private void writeData(FileSKVWriter w, int s, int e) throws Exception { - for (int i = s; i <= e; i++) { - w.append(new Key(new Text(String.format("%04d", i))), - new Value(Integer.toString(i).getBytes(UTF_8))); - } - } - } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java index f7f4ec4..e5ccfcf 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java @@ -45,6 +45,8 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlan.RangeType; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; @@ -62,6 +64,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -205,7 +208,7 @@ public class BulkLoadIT extends AccumuloClusterHarness { } } - private void testBulkFile(boolean offline) throws Exception { + private void testBulkFile(boolean offline, boolean usePlan) throws Exception { Connector c = getConnector(); addSplits(tableName, "0333 0666 0999 1333 1666"); @@ -238,7 +241,15 @@ public class BulkLoadIT extends AccumuloClusterHarness { hashes.get("1666").add(h4); hashes.get("null").add(h4); - c.tableOperations().addFilesTo(tableName).from(dir).load(); + if (usePlan) { + LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)) + .loadFileTo("f2.rf", RangeType.TABLE, row(333), row(999)) + .loadFileTo("f3.rf", RangeType.FILE, row(1000), row(1499)) + .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build(); + c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load(); + } else { + c.tableOperations().addFilesTo(tableName).from(dir).load(); + } if (offline) c.tableOperations().online(tableName); @@ -249,12 +260,63 @@ public class BulkLoadIT extends AccumuloClusterHarness { @Test public void testBulkFile() throws Exception { - testBulkFile(false); + testBulkFile(false, false); } @Test public void testBulkFileOffline() throws Exception { - testBulkFile(true); + testBulkFile(true, false); + } + + @Test + public void testLoadPlan() throws Exception { + testBulkFile(false, true); + } + + @Test + public void testLoadPlanOffline() throws Exception { + testBulkFile(true, true); + } + + @Test + public void testBadLoadPlans() throws Exception { + Connector c = getConnector(); + addSplits(tableName, "0333 0666 0999 1333 1666"); + + String dir = getDir("/testBulkFile-"); + + writeData(dir + "/f1.", aconf, 0, 333); + writeData(dir + "/f2.", aconf, 0, 666); + + // Create a plan with more files than exists in dir + LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)) + .loadFileTo("f2.rf", RangeType.TABLE, null, row(666)) + .loadFileTo("f3.rf", RangeType.TABLE, null, row(666)).build(); + try { + c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load(); + Assert.fail(); + } catch (IllegalArgumentException e) { + // ignore + } + + // Create a plan with less files than exists in dir + loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(333)).build(); + try { + c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load(); + Assert.fail(); + } catch (IllegalArgumentException e) { + // ignore + } + + // Create a plan with tablet boundary that does not exits + loadPlan = LoadPlan.builder().loadFileTo("f1.rf", RangeType.TABLE, null, row(555)) + .loadFileTo("f2.rf", RangeType.TABLE, null, row(555)).build(); + try { + c.tableOperations().addFilesTo(tableName).from(dir).plan(loadPlan).load(); + Assert.fail(); + } catch (AccumuloException e) { + // ignore + } } private void addSplits(String tableName, String splitString) throws Exception { @@ -325,6 +387,10 @@ public class BulkLoadIT extends AccumuloClusterHarness { } } + private static String row(int r) { + return String.format("%04d", r); + } + private String writeData(String file, AccumuloConfiguration aconf, int s, int e) throws Exception { FileSystem fs = getCluster().getFileSystem(); @@ -333,8 +399,7 @@ public class BulkLoadIT extends AccumuloClusterHarness { .forFile(filename, fs, fs.getConf()).withTableConfiguration(aconf).build()) { writer.startDefaultLocalityGroup(); for (int i = s; i <= e; i++) { - writer.append(new Key(new Text(String.format("%04d", i))), - new Value(Integer.toString(i).getBytes(UTF_8))); + writer.append(new Key(new Text(row(i))), new Value(Integer.toString(i).getBytes(UTF_8))); } }