accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: fix #1153 preventing multiple threads working on same bulk file (#1157)
Date Fri, 10 May 2019 20:20:50 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new f2bf2d8  fix #1153 preventing multiple threads working on same bulk file (#1157)
f2bf2d8 is described below

commit f2bf2d8daebf554630be41ae37db3c4c44b975aa
Author: Keith Turner <kturner@apache.org>
AuthorDate: Fri May 10 16:20:45 2019 -0400

    fix #1153 preventing multiple threads working on same bulk file (#1157)
---
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 849b2b0..3366e9e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -255,6 +255,11 @@ public class Tablet implements TabletCommitter {
 
   private final ConfigurationObserver configObserver;
 
+  // Files that are currently in the process of bulk importing. Access to this is protected
by the
+  // tablet lock.
+  private final Set<FileRef> bulkImporting = new HashSet<>();
+
+  // Files that were successfully bulk imported.
   private final Cache<Long,List<FileRef>> bulkImported = CacheBuilder.newBuilder().build();
 
   private final int logId;
@@ -2437,6 +2442,16 @@ public class Tablet implements TabletCommitter {
           }
         }
       }
+
+      Iterator<FileRef> fiter = fileMap.keySet().iterator();
+      while (fiter.hasNext()) {
+        FileRef file = fiter.next();
+        if (bulkImporting.contains(file)) {
+          log.info("Ignoring import of bulk file currently importing: " + file);
+          fiter.remove();
+        }
+      }
+
       if (fileMap.isEmpty()) {
         return;
       }
@@ -2447,6 +2462,9 @@ public class Tablet implements TabletCommitter {
             + extent);
       }
 
+      // prevent other threads from processing this file while its added to the metadata
table.
+      bulkImporting.addAll(fileMap.keySet());
+
       writesInProgress++;
     }
     try {
@@ -2471,6 +2489,11 @@ public class Tablet implements TabletCommitter {
         if (writesInProgress == 0)
           this.notifyAll();
 
+        if (!bulkImporting.removeAll(fileMap.keySet())) {
+          throw new AssertionError(
+              "Likely bug in code, always expect to remove something.  Please open an Accumulo
issue.");
+        }
+
         try {
           bulkImported.get(tid, new Callable<List<FileRef>>() {
             @Override


Mime
View raw message