From commits-return-24326-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Wed Nov 4 21:35:39 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 24C5F18060E for ; Wed, 4 Nov 2020 22:35:39 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 683E543400 for ; Wed, 4 Nov 2020 21:35:38 +0000 (UTC) Received: (qmail 57545 invoked by uid 500); 4 Nov 2020 21:35:38 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 57536 invoked by uid 99); 4 Nov 2020 21:35:38 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Nov 2020 21:35:38 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id B5D5F820EB; Wed, 4 Nov 2020 21:35:37 +0000 (UTC) Date: Wed, 04 Nov 2020 21:35:37 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch main updated: Minor cleanup in BulkImport (#1769) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <160452573740.22609.3954820319588570380@gitbox.apache.org> From: mmiller@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/main X-Git-Reftype: branch X-Git-Oldrev: 5e0bbfcf7fe74a41cc3283d3f255162a5900c671 X-Git-Newrev: 46ee51d0c6afe9910873f55653017b91c45a345a X-Git-Rev: 46ee51d0c6afe9910873f55653017b91c45a345a X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 46ee51d Minor cleanup in BulkImport (#1769) 46ee51d is described below commit 46ee51d0c6afe9910873f55653017b91c45a345a Author: Mike Miller AuthorDate: Wed Nov 4 16:35:25 2020 -0500 Minor cleanup in BulkImport (#1769) * Remove exceptions from BulkImport and ConcurrentKeyExtentCache internal classes that were declared but never thrown * Remove other unused variables * Shorten a few lines that use lambdas --- .../accumulo/core/clientImpl/bulk/BulkImport.java | 35 ++++++---------------- .../clientImpl/bulk/ConcurrentKeyExtentCache.java | 9 ++---- .../bulk/ConcurrentKeyExtentCacheTest.java | 13 ++------ 3 files changed, 14 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 1a60318..160db69 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.groupingBy; +import static org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.pathToCacheId; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,7 +76,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.fate.util.Retry; @@ -302,35 +302,25 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } public interface KeyExtentCache { - KeyExtent lookup(Text row) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException; + KeyExtent lookup(Text row); } public static List findOverlappingTablets(KeyExtentCache extentCache, - FileSKVIterator reader) - throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { - - Text startRow = null; - Text endRow = null; + FileSKVIterator reader) throws IOException { List result = new ArrayList<>(); Collection columnFamilies = Collections.emptyList(); - Text row = startRow; - if (row == null) - row = new Text(); + Text row = new Text(); while (true) { - // log.debug(filename + " Seeking to row " + row); reader.seek(new Range(row, null), columnFamilies, false); if (!reader.hasTop()) { - // log.debug(filename + " not found"); break; } row = reader.getTopKey().getRow(); KeyExtent extent = extentCache.lookup(row); - // log.debug(filename + " found row " + row + " at location " + tabletLocation); result.add(extent); row = extent.endRow(); - if (row != null && (endRow == null || row.compareTo(endRow) < 0)) { + if (row != null) { row = nextRow(row); } else break; @@ -347,8 +337,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti public static List findOverlappingTablets(ClientContext context, KeyExtentCache extentCache, Path file, FileSystem fs, Cache fileLenCache, - CryptoService cs) - throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file.toString(), fs, fs.getConf(), cs) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) @@ -361,7 +350,6 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti HashMap fileLens = new HashMap<>(); for (FileStatus status : statuses) { fileLens.put(status.getPath().getName(), status.getLen()); - status.getLen(); } return fileLens; @@ -371,9 +359,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti Map fileLens = getFileLenMap(statuses); Map absFileLens = new HashMap<>(); - fileLens.forEach((k, v) -> { - absFileLens.put(CachableBlockFile.pathToCacheId(new Path(dir, k)), v); - }); + fileLens.forEach((k, v) -> absFileLens.put(pathToCacheId(new Path(dir, k)), v)); Cache fileLenCache = CacheBuilder.newBuilder().build(); @@ -440,8 +426,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti } private Set mapDestinationsToExtents(TableId tableId, KeyExtentCache kec, - List destinations) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + List destinations) { Set extents = new HashSet<>(); for (Destination dest : destinations) { @@ -570,9 +555,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti for (CompletableFuture> future : futures) { try { Map pathMapping = future.get(); - pathMapping.forEach((extent, path) -> { - mappings.computeIfAbsent(extent, k -> new Bulk.Files()).add(path); - }); + pathMapping.forEach((ext, fi) -> mappings.computeIfAbsent(ext, k -> new Files()).add(fi)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java index c31695c..901c672 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java @@ -31,9 +31,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Stream; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.bulk.BulkImport.KeyExtentCache; import org.apache.accumulo.core.data.TableId; @@ -85,15 +82,13 @@ class ConcurrentKeyExtentCache implements KeyExtentCache { } @VisibleForTesting - protected Stream lookupExtents(Text row) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + protected Stream lookupExtents(Text row) { return TabletsMetadata.builder().forTable(tableId).overlapping(row, null).checkConsistency() .fetch(PREV_ROW).build(ctx).stream().limit(100).map(TabletMetadata::getExtent); } @Override - public KeyExtent lookup(Text row) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + public KeyExtent lookup(Text row) { while (true) { KeyExtent ke = getFromCache(row); if (ke != null) diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java index c051499..c17298c 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCacheTest.java @@ -31,9 +31,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.hadoop.io.Text; @@ -92,13 +89,9 @@ public class ConcurrentKeyExtentCacheTest { } private void testLookup(TestCache tc, Text lookupRow) { - try { - KeyExtent extent = tc.lookup(lookupRow); - assertTrue(extent.contains(lookupRow)); - assertTrue(extentsSet.contains(extent)); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - throw new RuntimeException(e); - } + KeyExtent extent = tc.lookup(lookupRow); + assertTrue(extent.contains(lookupRow)); + assertTrue(extentsSet.contains(extent)); } @Test