storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/3] storm git commit: Fixed issues with not all maps allowing sideeffects.
Date Tue, 10 Apr 2018 09:08:26 GMT
Fixed issues with not all maps allowing sideeffects.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3fa24d1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3fa24d1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3fa24d1d

Branch: refs/heads/master
Commit: 3fa24d1dad5b5b4504c217f511c98d43aebf6a14
Parents: 337aef8
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Mon Apr 9 16:52:25 2018 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Mon Apr 9 16:52:25 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/localizer/AsyncLocalizer.java   | 15 +++++++++------
 .../apache/storm/localizer/AsyncLocalizerTest.java   |  5 +++--
 2 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 1852d94..1cdcaad 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -75,10 +75,13 @@ public class AsyncLocalizer implements AutoCloseable {
 
     private final boolean isLocalMode;
     // track resources - user to resourceSet
-    protected final ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>>
userFiles = new ConcurrentHashMap<>();
-    protected final ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>>
userArchives = new ConcurrentHashMap<>();
+    //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks
to guarantee atomicity for compute and
+    // computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in,
and would require the function to have
+    // no side effects.
+    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>>
userFiles = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>>
userArchives = new ConcurrentHashMap<>();
     // topology to tracking of topology dir and resources
-    private final Map<String, CompletableFuture<Void>> blobPending;
+    private final ConcurrentHashMap<String, CompletableFuture<Void>> blobPending;
     private final Map<String, Object> conf;
     private final AdvancedFSOps fsOps;
     private final boolean symlinksDisabled;
@@ -118,7 +121,7 @@ public class AsyncLocalizer implements AutoCloseable {
         reconstructLocalizedResources();
 
         symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
-        blobPending = new HashMap<>();
+        blobPending = new ConcurrentHashMap<>();
     }
 
     public AsyncLocalizer(Map<String, Object> conf) throws IOException {
@@ -629,12 +632,12 @@ public class AsyncLocalizer implements AutoCloseable {
     void cleanup() {
         LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
         // need one large set of all and then clean via LRU
-        for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t :
userArchives.entrySet()) {
+        for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>>
t : userArchives.entrySet()) {
             toClean.addResources(t.getValue());
             LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(),
toClean);
         }
 
-        for (Map.Entry<String, ConcurrentMap<String, LocalizedResource>> t :
userFiles.entrySet()) {
+        for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>>
t : userFiles.entrySet()) {
             toClean.addResources(t.getValue());
             LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3fa24d1d/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 63f4e9c..00a3f98 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -298,11 +299,11 @@ public class AsyncLocalizerTest {
         }
 
         // For testing, be careful as it doesn't clone
-        ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> getUserFiles()
{
+        ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>>
getUserFiles() {
             return userFiles;
         }
 
-        ConcurrentMap<String, ConcurrentMap<String, LocalizedResource>> getUserArchives()
{
+        ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>>
getUserArchives() {
             return userArchives;
         }
 


Mime
View raw message