asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From il...@apache.org
Subject asterixdb git commit: Fixed race condition during ncMap lookup
Date Wed, 31 May 2017 05:45:26 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 356f6c6a0 -> ffd6e4ac5


Fixed race condition during ncMap lookup

Change-Id: I1bfbe712c100f48011a516c373ac8994028dc3dd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1792
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ffd6e4ac
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ffd6e4ac
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ffd6e4ac

Branch: refs/heads/master
Commit: ffd6e4ac51ded48ee5e538fe4a2334d74068d414
Parents: 356f6c6
Author: Ildar Absalyamov <ildar.absalyamov@gmail.com>
Authored: Tue May 30 18:26:45 2017 -0700
Committer: Ildar Absalyamov <ildar.absalyamov@gmail.com>
Committed: Tue May 30 22:44:36 2017 -0700

----------------------------------------------------------------------
 .../asterix/external/api/INodeResolver.java     |  7 ++-
 .../factory/LocalFSInputStreamFactory.java      |  8 ++-
 .../asterix/external/util/IdentityResolver.java |  7 ++-
 .../asterix/external/util/NodeResolver.java     | 58 +++++---------------
 .../asterix/runtime/utils/RuntimeUtils.java     |  4 +-
 5 files changed, 35 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
index 99ffdf1..c6e87bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.api;
 
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 
@@ -35,5 +39,6 @@ public interface INodeResolver {
      * @return resolved result (a node controller id)
      * @throws AsterixException
      */
-    String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException;
+    String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>>
ncMap, Set<String> ncs)
+            throws AsterixException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 44b0b43..d7afa13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -18,13 +18,16 @@
  */
 package org.apache.asterix.external.input.stream.factory;
 
+import java.net.InetAddress;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -37,6 +40,7 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -103,6 +107,8 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory
{
 
     private void configureFileSplits(ICcApplicationContext appCtx, String[] splits) throws
AsterixException {
         INodeResolver resolver = getNodeResolver();
+        Map<InetAddress, Set<String>> ncMap = RuntimeUtils.getForcedNodeControllerMap(appCtx);
+        Set<String> ncs = ncMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
         inputFileSplits = new UnmanagedFileSplit[splits.length];
         String node;
         String path;
@@ -114,7 +120,7 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory
{
                 throw new AsterixException(
                         "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File
Path\"");
             }
-            node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0]);
+            node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0], ncMap, ncs);
             path = trimmedValue.split("://")[1];
             inputFileSplits[count++] = new UnmanagedFileSplit(node, path);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
index 9a4ddff..651833f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.util;
 
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.INodeResolver;
 
@@ -28,7 +32,8 @@ import org.apache.asterix.external.api.INodeResolver;
 public class IdentityResolver implements INodeResolver {
 
     @Override
-    public String resolveNode(ICcApplicationContext appCtx, String value) {
+    public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress,
Set<String>> ncMap,
+            Set<String> ncs) {
         return value;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
index 5b15e9e..c180dea 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -20,10 +20,7 @@ package org.apache.asterix.external.util;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 
@@ -31,57 +28,30 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
 
 /**
  * Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller
running at the location.
  */
 public class NodeResolver implements INodeResolver {
     //TODO: change this call and replace by calling AsterixClusterProperties
-    private static final Random random = new Random();
-    private static final Map<InetAddress, Set<String>> ncMap = new HashMap<>();
-    private static final Set<String> ncs = new HashSet<>();
+    private final Random random = new Random();
 
     @Override
-    public String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException
{
+    public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress,
Set<String>> ncMap,
+            Set<String> ncs) throws AsterixException {
+        if (ncs.contains(value)) {
+            return value;
+        }
+        InetAddress ipAddress = null;
         try {
-            if (ncMap.isEmpty()) {
-                NodeResolver.updateNCs(appCtx);
-            }
-            if (ncs.contains(value)) {
-                return value;
-            } else {
-                NodeResolver.updateNCs(appCtx);
-                if (ncs.contains(value)) {
-                    return value;
-                }
-            }
-            InetAddress ipAddress = null;
-            try {
-                ipAddress = InetAddress.getByName(value);
-            } catch (UnknownHostException e) {
-                throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e,
value);
-            }
-            Set<String> nodeControllers = ncMap.get(ipAddress);
-            if (nodeControllers == null || nodeControllers.isEmpty()) {
-                throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value);
-            }
-            return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
-        } catch (Exception e) {
-            throw new AsterixException(e);
+            ipAddress = InetAddress.getByName(value);
+        } catch (UnknownHostException e) {
+            throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value);
         }
-    }
-
-    private static void updateNCs(ICcApplicationContext appCtx) throws Exception {
-        synchronized (ncMap) {
-            ncMap.clear();
-            RuntimeUtils.getNodeControllerMap(appCtx, ncMap);
-            synchronized (ncs) {
-                ncs.clear();
-                for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet())
{
-                    ncs.addAll(entry.getValue());
-                }
-            }
+        Set<String> nodeControllers = ncMap.get(ipAddress);
+        if (nodeControllers == null || nodeControllers.isEmpty()) {
+            throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value);
         }
+        return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ffd6e4ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
index 85e93b8..6970af5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
@@ -60,10 +60,10 @@ public class RuntimeUtils {
         return map;
     }
 
-    public static void getNodeControllerMap(ICcApplicationContext appCtx, Map<InetAddress,
Set<String>> map) {
+    public static Map<InetAddress, Set<String>> getForcedNodeControllerMap(ICcApplicationContext
appCtx) {
         ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
         INodeManager nodeManager = ccs.getNodeManager();
-        map.putAll(nodeManager.getIpAddressNodeNameMap());
+        return nodeManager.getIpAddressNodeNameMap();
     }
 
     public static JobSpecification createJobSpecification(ICcApplicationContext appCtx) {


Mime
View raw message