From commits-return-22895-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Fri May 10 20:20:51 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 118B918061A for ; Fri, 10 May 2019 22:20:50 +0200 (CEST) Received: (qmail 70309 invoked by uid 500); 10 May 2019 20:20:50 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 70300 invoked by uid 99); 10 May 2019 20:20:50 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 May 2019 20:20:50 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 399D0872F9; Fri, 10 May 2019 20:20:50 +0000 (UTC) Date: Fri, 10 May 2019 20:20:50 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch 1.9 updated: fix #1153 preventing multiple threads working on same bulk file (#1157) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155751965010.32430.7271503908973481091@gitbox.apache.org> From: kturner@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/1.9 X-Git-Reftype: branch X-Git-Oldrev: c56eadabf913f981fa10a51f9f874b4f6287351a X-Git-Newrev: f2bf2d8daebf554630be41ae37db3c4c44b975aa X-Git-Rev: f2bf2d8daebf554630be41ae37db3c4c44b975aa X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/1.9 by this push: new f2bf2d8 fix #1153 preventing multiple threads working on same bulk file (#1157) f2bf2d8 is described below commit f2bf2d8daebf554630be41ae37db3c4c44b975aa Author: Keith Turner AuthorDate: Fri May 10 16:20:45 2019 -0400 fix #1153 preventing multiple threads working on same bulk file (#1157) --- .../org/apache/accumulo/tserver/tablet/Tablet.java | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 849b2b0..3366e9e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -255,6 +255,11 @@ public class Tablet implements TabletCommitter { private final ConfigurationObserver configObserver; + // Files that are currently in the process of bulk importing. Access to this is protected by the + // tablet lock. + private final Set bulkImporting = new HashSet<>(); + + // Files that were successfully bulk imported. private final Cache> bulkImported = CacheBuilder.newBuilder().build(); private final int logId; @@ -2437,6 +2442,16 @@ public class Tablet implements TabletCommitter { } } } + + Iterator fiter = fileMap.keySet().iterator(); + while (fiter.hasNext()) { + FileRef file = fiter.next(); + if (bulkImporting.contains(file)) { + log.info("Ignoring import of bulk file currently importing: " + file); + fiter.remove(); + } + } + if (fileMap.isEmpty()) { return; } @@ -2447,6 +2462,9 @@ public class Tablet implements TabletCommitter { + extent); } + // prevent other threads from processing this file while its added to the metadata table. + bulkImporting.addAll(fileMap.keySet()); + writesInProgress++; } try { @@ -2471,6 +2489,11 @@ public class Tablet implements TabletCommitter { if (writesInProgress == 0) this.notifyAll(); + if (!bulkImporting.removeAll(fileMap.keySet())) { + throw new AssertionError( + "Likely bug in code, always expect to remove something. Please open an Accumulo issue."); + } + try { bulkImported.get(tid, new Callable>() { @Override