accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: fixes #518 ignore non rfiles in new bulk import (#773)
Date Tue, 20 Nov 2018 13:15:02 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new e5106ef  fixes #518 ignore non rfiles in new bulk import (#773)
e5106ef is described below

commit e5106efb9613468b46b4f94e549d81012631a34b
Author: Keith Turner <kturner@apache.org>
AuthorDate: Tue Nov 20 08:14:57 2018 -0500

    fixes #518 ignore non rfiles in new bulk import (#773)
---
 .../accumulo/core/clientImpl/BulkImport.java       | 46 +++++++++++++++++++---
 .../accumulo/test/functional/BulkLoadIT.java       |  6 +++
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
index 426cf0b..3d8da28 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
@@ -312,7 +312,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
     }
   }
 
-  private static Map<String,Long> getFileLenMap(FileStatus[] statuses) {
+  private static Map<String,Long> getFileLenMap(List<FileStatus> statuses) {
     HashMap<String,Long> fileLens = new HashMap<>();
     for (FileStatus status : statuses) {
       fileLens.put(status.getPath().getName(), status.getLen());
@@ -322,7 +322,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
     return fileLens;
   }
 
-  private static Cache<String,Long> getPopulatedFileLenCache(Path dir, FileStatus[]
statuses) {
+  private static Cache<String,Long> getPopulatedFileLenCache(Path dir, List<FileStatus>
statuses) {
     Map<String,Long> fileLens = getFileLenMap(statuses);
 
     Map<String,Long> absFileLens = new HashMap<>();
@@ -343,8 +343,8 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
     Map<String,List<Destination>> fileDestinations = plan.getDestinations().stream()
         .collect(groupingBy(Destination::getFileName));
 
-    FileStatus[] statuses = fs.listStatus(srcPath,
-        p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
+    List<FileStatus> statuses = filterInvalid(
+        fs.listStatus(srcPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)));
 
     Map<String,Long> fileLens = getFileLenMap(statuses);
 
@@ -447,13 +447,47 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
 
   }
 
+  private static List<FileStatus> filterInvalid(FileStatus[] files) {
+    ArrayList<FileStatus> fileList = new ArrayList<>(files.length);
+
+    for (FileStatus fileStatus : files) {
+
+      String fname = fileStatus.getPath().getName();
+
+      if (fname.equals("_SUCCESS") || fname.equals("_logs")) {
+        log.debug("Ignoring file likely created by map reduce : {}", fileStatus.getPath());
+        continue;
+      }
+
+      if (fileStatus.isDirectory()) {
+        log.warn("{} is a directory, ignoring.", fileStatus.getPath());
+        continue;
+      }
+
+      String sa[] = fname.split("\\.");
+      String extension = "";
+      if (sa.length > 1) {
+        extension = sa[sa.length - 1];
+      }
+
+      if (!FileOperations.getValidExtensions().contains(extension)) {
+        log.warn("{} does not have a valid extension, ignoring", fileStatus.getPath());
+        continue;
+      }
+
+      fileList.add(fileStatus);
+    }
+
+    return fileList;
+  }
+
   public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem
fs,
       Table.ID tableId, Path dirPath, Executor executor, ClientContext context) throws IOException
{
 
     KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context);
 
-    FileStatus[] files = fs.listStatus(dirPath,
-        p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
+    List<FileStatus> files = filterInvalid(
+        fs.listStatus(dirPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)));
 
     // we know all of the file lens, so construct a cache and populate it in order to avoid
later
     // trips to the namenode
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
index 34f2707..24fd475 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
@@ -59,6 +59,7 @@ import org.apache.accumulo.minicluster.MemoryUnit;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -231,6 +232,11 @@ public class BulkLoadIT extends AccumuloClusterHarness {
         hashes.put(endRow, new HashSet<>());
       }
 
+      // Add a junk file, should be ignored
+      FSDataOutputStream out = fs.create(new Path(dir, "junk"));
+      out.writeChars("ABCDEFG\n");
+      out.close();
+
       // 1 Tablet 0333-null
       String h1 = writeData(dir + "/f1.", aconf, 0, 333);
       hashes.get("0333").add(h1);


Mime
View raw message