From commits-return-13911-archive-asf-public=cust-asf.ponee.io@storm.apache.org Tue Apr 10 11:08:27 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 25B4118064C for ; Tue, 10 Apr 2018 11:08:26 +0200 (CEST) Received: (qmail 61534 invoked by uid 500); 10 Apr 2018 09:08:26 -0000 Mailing-List: contact commits-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@storm.apache.org Delivered-To: mailing list commits@storm.apache.org Received: (qmail 61471 invoked by uid 99); 10 Apr 2018 09:08:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Apr 2018 09:08:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93DD8F4E5A; Tue, 10 Apr 2018 09:08:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kabhwan@apache.org To: commits@storm.apache.org Date: Tue, 10 Apr 2018 09:08:26 -0000 Message-Id: In-Reply-To: <2b1ae389679b48728033b972e3d52193@git.apache.org> References: <2b1ae389679b48728033b972e3d52193@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] storm git commit: Fixed issues with not all maps allowing sideeffects. Fixed issues with not all maps allowing sideeffects. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fa24d1d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fa24d1d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fa24d1d Branch: refs/heads/master Commit: 3fa24d1dad5b5b4504c217f511c98d43aebf6a14 Parents: 337aef8 Author: Robert (Bobby) Evans Authored: Mon Apr 9 16:52:25 2018 -0500 Committer: Robert (Bobby) Evans Committed: Mon Apr 9 16:52:25 2018 -0500 ---------------------------------------------------------------------- .../org/apache/storm/localizer/AsyncLocalizer.java | 15 +++++++++------ .../apache/storm/localizer/AsyncLocalizerTest.java | 5 +++-- 2 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java index 1852d94..1cdcaad 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java @@ -75,10 +75,13 @@ public class AsyncLocalizer implements AutoCloseable { private final boolean isLocalMode; // track resources - user to resourceSet - protected final ConcurrentMap> userFiles = new ConcurrentHashMap<>(); - protected final ConcurrentMap> userArchives = new ConcurrentHashMap<>(); + //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and + // computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in, and would require the function to have + // no side effects. + protected final ConcurrentHashMap> userFiles = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap> userArchives = new ConcurrentHashMap<>(); // topology to tracking of topology dir and resources - private final Map> blobPending; + private final ConcurrentHashMap> blobPending; private final Map conf; private final AdvancedFSOps fsOps; private final boolean symlinksDisabled; @@ -118,7 +121,7 @@ public class AsyncLocalizer implements AutoCloseable { reconstructLocalizedResources(); symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); - blobPending = new HashMap<>(); + blobPending = new ConcurrentHashMap<>(); } public AsyncLocalizer(Map conf) throws IOException { @@ -629,12 +632,12 @@ public class AsyncLocalizer implements AutoCloseable { void cleanup() { LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize); // need one large set of all and then clean via LRU - for (Map.Entry> t : userArchives.entrySet()) { + for (Map.Entry> t : userArchives.entrySet()) { toClean.addResources(t.getValue()); LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean); } - for (Map.Entry> t : userFiles.entrySet()) { + for (Map.Entry> t : userFiles.entrySet()) { toClean.addResources(t.getValue()); LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean); } http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java index 63f4e9c..00a3f98 100644 --- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java +++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -298,11 +299,11 @@ public class AsyncLocalizerTest { } // For testing, be careful as it doesn't clone - ConcurrentMap> getUserFiles() { + ConcurrentHashMap> getUserFiles() { return userFiles; } - ConcurrentMap> getUserArchives() { + ConcurrentHashMap> getUserArchives() { return userArchives; }