hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [2/5] hadoop git commit: HDFS-13224. RBF: Resolvers to support mount points across multiple subclusters. Contributed by Inigo Goiri.
Date Thu, 15 Mar 2018 17:52:15 GMT
HDFS-13224. RBF: Resolvers to support mount points across multiple subclusters. Contributed by Inigo Goiri.

(cherry picked from commit e71bc00a471422ddb26dd54e706f09f0fe09925c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13f2ee05
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13f2ee05
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13f2ee05

Branch: refs/heads/branch-3.1
Commit: 13f2ee05e92c595ada01148ccfd58c745a75ffc5
Parents: 87f6d33
Author: Inigo Goiri <inigoiri@apache.org>
Authored: Thu Mar 15 10:32:30 2018 -0700
Committer: Inigo Goiri <inigoiri@apache.org>
Committed: Thu Mar 15 10:34:45 2018 -0700

----------------------------------------------------------------------
 .../federation/resolver/MountTableResolver.java |   6 +-
 .../MultipleDestinationMountTableResolver.java  | 116 +++++
 .../resolver/order/DestinationOrder.java        |   5 +-
 .../resolver/order/HashFirstResolver.java       |  71 ++++
 .../federation/resolver/order/HashResolver.java | 137 ++++++
 .../resolver/order/LocalResolver.java           | 297 +++++++++++++
 .../resolver/order/OrderedResolver.java         |  37 ++
 .../resolver/order/RandomResolver.java          |  62 +++
 .../federation/router/RouterRpcServer.java      |  49 ++-
 .../store/records/impl/pb/MountTablePBImpl.java |   4 +
 .../federation/utils/ConsistentHashRing.java    | 144 +++++++
 .../server/federation/utils/package-info.java   |  28 ++
 .../hdfs/tools/federation/RouterAdmin.java      |   3 +-
 .../src/main/proto/FederationProtocol.proto     |   1 +
 .../resolver/TestMountTableResolver.java        |   6 +-
 .../TestMultipleDestinationResolver.java        | 419 +++++++++++++++++++
 .../resolver/order/TestLocalResolver.java       | 143 +++++++
 .../federation/router/TestRouterAdmin.java      |  48 +++
 .../federation/router/TestRouterAdminCLI.java   |  34 ++
 19 files changed, 1589 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
index 27b43e5..3c45faf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java
@@ -240,7 +240,7 @@ public class MountTableResolver
       PathLocation loc = entry.getValue();
       String src = loc.getSourcePath();
       if (src != null) {
-        if(isParentEntry(src, path)) {
+        if (isParentEntry(src, path)) {
           LOG.debug("Removing {}", src);
           it.remove();
         }
@@ -306,7 +306,7 @@ public class MountTableResolver
                 existingEntry, entry);
             this.tree.put(srcPath, entry);
             invalidateLocationCache(srcPath);
-            LOG.info("Updated mount point {} in resolver");
+            LOG.info("Updated mount point {} in resolver", srcPath);
           }
         }
       }
@@ -515,7 +515,7 @@ public class MountTableResolver
       String nsId = oneDst.getNameserviceId();
       String dest = oneDst.getDest();
       String newPath = dest;
-      if (!newPath.endsWith(Path.SEPARATOR)) {
+      if (!newPath.endsWith(Path.SEPARATOR) && !remainingPath.isEmpty()) {
         newPath += Path.SEPARATOR;
       }
       newPath += remainingPath;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
new file mode 100644
index 0000000..8dd73ec
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MultipleDestinationMountTableResolver.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import java.io.IOException;
+import java.util.EnumMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.HashFirstResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.LocalResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.OrderedResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.RandomResolver;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Mount table resolver that supports multiple locations for each mount entry.
+ * The returned location contains prioritized remote paths from highest priority
+ * to the lowest priority. Multiple locations for a mount point are optional.
+ * When multiple locations are specified, both will be checked for the presence
+ * of a file and the nameservice for a new file/dir is chosen based on the
+ * results of a consistent hashing algorithm.
+ * <p>
+ * Does the Mount table entry for this path have multiple destinations?
+ * <ul>
+ * <li>No -> Return the location
+ * <li>Yes -> Return all locations, prioritizing the best guess from the
+ * consistent hashing algorithm.
+ * </ul>
+ * <p>
+ * It has multiple options to order the locations: HASH (default), LOCAL,
+ * RANDOM, and HASH_ALL.
+ * <p>
+ * The consistent hashing result is dependent on the number and combination of
+ * nameservices that are registered for particular mount point. The order of
+ * nameservices/locations in the mount table is not prioritized. Each consistent
+ * hash calculation considers only the set of unique nameservices present for
+ * the mount table location.
+ */
+public class MultipleDestinationMountTableResolver extends MountTableResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MultipleDestinationMountTableResolver.class);
+
+
+  /** Resolvers that use a particular order for multiple destinations. */
+  private EnumMap<DestinationOrder, OrderedResolver> orderedResolvers =
+      new EnumMap<>(DestinationOrder.class);
+
+
+  public MultipleDestinationMountTableResolver(
+      Configuration conf, Router router) {
+    super(conf, router);
+
+    // Initialize the ordered resolvers
+    addResolver(DestinationOrder.HASH, new HashFirstResolver());
+    addResolver(DestinationOrder.LOCAL, new LocalResolver(conf, router));
+    addResolver(DestinationOrder.RANDOM, new RandomResolver());
+    addResolver(DestinationOrder.HASH_ALL, new HashResolver());
+  }
+
+  @Override
+  public PathLocation getDestinationForPath(String path) throws IOException {
+    PathLocation mountTableResult = super.getDestinationForPath(path);
+    if (mountTableResult == null) {
+      LOG.error("The {} cannot find a location for {}",
+          super.getClass().getSimpleName(), path);
+    } else if (mountTableResult.hasMultipleDestinations()) {
+      DestinationOrder order = mountTableResult.getDestinationOrder();
+      OrderedResolver orderedResolver = orderedResolvers.get(order);
+      if (orderedResolver == null) {
+        LOG.error("Cannot find resolver for order {}", order);
+      } else {
+        String firstNamespace =
+            orderedResolver.getFirstNamespace(path, mountTableResult);
+
+        // Change the order of the name spaces according to the policy
+        if (firstNamespace != null) {
+          // This is the entity in the tree, we need to create our own copy
+          mountTableResult = new PathLocation(mountTableResult, firstNamespace);
+          LOG.debug("Ordered locations following {} are {}",
+              order, mountTableResult);
+        } else {
+          LOG.error("Cannot get main namespace for path {} with order {}",
+              path, order);
+        }
+      }
+    }
+    return mountTableResult;
+  }
+
+  @VisibleForTesting
+  public void addResolver(DestinationOrder order, OrderedResolver resolver) {
+    orderedResolvers.put(order, resolver);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
index 4bccf10..03e68e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
@@ -23,7 +23,8 @@ package org.apache.hadoop.hdfs.server.federation.resolver.order;
  * this determines which location should be checked first.
  */
 public enum DestinationOrder {
-  HASH, // Follow consistent hashing
+  HASH, // Follow consistent hashing in the first folder level
   LOCAL, // Local first
-  RANDOM // Random order
+  RANDOM, // Random order
+  HASH_ALL // Follow consistent hashing
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java
new file mode 100644
index 0000000..831b082
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashFirstResolver.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+
+/**
+ * Variation of HashResolver that only uses the first level of the path.
+ */
+public class HashFirstResolver extends HashResolver {
+
+  @Override
+  public String getFirstNamespace(final String path, final PathLocation loc) {
+    String srcPath = loc.getSourcePath();
+    String trimmedPath = trimPathToChild(path, srcPath);
+    LOG.debug("Only using the first part of the path: {} -> {}",
+        path, trimmedPath);
+    return super.getFirstNamespace(trimmedPath, loc);
+  }
+
+  /**
+   * Hash only up to the immediate child of the mount point. This prevents the
+   * need to create/maintain subtrees under each multi-destination mount point.
+   * Each child of a multi-destination mount is mapped to only one hash
+   * location.
+   * <p>
+   * Trims a path to at most the immediate child of a parent path. For example:
+   * <ul>
+   * <li>path = /a/b/c, parent = /a will be trimmed to /a/b.
+   * <li>path = /a/b, parent = /a/b will be trimmed to /a/b
+   * </ul>
+   *
+   * @param path The path to trim.
+   * @param parent The parent used to find the immediate child.
+   * @return Trimmed path.
+   */
+  private static String trimPathToChild(String path, String parent) {
+    // Path is invalid or equal to the parent
+    if (path.length() <= parent.length()) {
+      return parent;
+    }
+    String remainder = path.substring(parent.length());
+    String[] components =
+        remainder.replaceFirst("^/", "").split(Path.SEPARATOR);
+    if (components.length > 0 && components[0].length() > 0) {
+      if (parent.endsWith(Path.SEPARATOR)) {
+        return parent + components[0];
+      } else {
+        return parent + Path.SEPARATOR + components[0];
+      }
+    } else {
+      return parent;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java
new file mode 100644
index 0000000..4034a46
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/HashResolver.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.utils.ConsistentHashRing;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Order the destinations based on consistent hashing.
+ */
+public class HashResolver implements OrderedResolver {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(HashResolver.class);
+
+
+  /** Namespace set hash -> Locator. */
+  private final Map<Integer, ConsistentHashRing> hashResolverMap;
+
+  /** Patterns for temporary files. */
+  private static final String HEX_PATTERN = "\\p{XDigit}";
+  private static final String UUID_PATTERN = HEX_PATTERN + "{8}-" +
+      HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" + HEX_PATTERN + "{4}-" +
+      HEX_PATTERN + "{12}";
+  private static final String ATTEMPT_PATTERN =
+      "attempt_\\d+_\\d{4}_._\\d{6}_\\d{2}";
+  private static final String[] TEMP_FILE_PATTERNS = {
+      "(.+)\\.COPYING$",
+      "(.+)\\._COPYING_.*$",
+      "(.+)\\.tmp$",
+      "_temp/(.+)$",
+      "_temporary/(.+)\\." + UUID_PATTERN + "$",
+      "(.*)_temporary/\\d/_temporary/" + ATTEMPT_PATTERN + "/(.+)$" };
+  /** Pattern for temporary files (or of the individual patterns). */
+  private static final Pattern TEMP_FILE_PATTERN =
+      Pattern.compile(StringUtils.join("|", TEMP_FILE_PATTERNS));
+
+
+  public HashResolver() {
+    this.hashResolverMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Use the result from consistent hashing locator to prioritize the locations
+   * for a path.
+   *
+   * @param path Path to check.
+   * @param loc Federated location with multiple destinations.
+   * @return First namespace based on hash.
+   */
+  @Override
+  public String getFirstNamespace(final String path, final PathLocation loc) {
+    String finalPath = extractTempFileName(path);
+    Set<String> namespaces = loc.getNamespaces();
+    ConsistentHashRing locator = getHashResolver(namespaces);
+    String hashedSubcluster = locator.getLocation(finalPath);
+    if (hashedSubcluster == null) {
+      String srcPath = loc.getSourcePath();
+      LOG.error("Cannot find subcluster for {} ({} -> {})",
+          srcPath, path, finalPath);
+    }
+    LOG.debug("Namespace for {} ({}) is {}", path, finalPath, hashedSubcluster);
+    return hashedSubcluster;
+  }
+
+  /**
+   * Get the cached (if available) or generate a new hash resolver for this
+   * particular set of unique namespace identifiers.
+   *
+   * @param namespaces A set of unique namespace identifiers.
+   * @return A hash resolver configured to consistently resolve paths to
+   *         namespaces using the provided set of namespace identifiers.
+   */
+  private ConsistentHashRing getHashResolver(final Set<String> namespaces) {
+    int hash = namespaces.hashCode();
+    ConsistentHashRing resolver = this.hashResolverMap.get(hash);
+    if (resolver == null) {
+      resolver = new ConsistentHashRing(namespaces);
+      this.hashResolverMap.put(hash, resolver);
+    }
+    return resolver;
+  }
+
+  /**
+   * Some files use a temporary naming pattern. Extract the final name from the
+   * temporary name. For example, files *._COPYING_ will be renamed, so we
+   * remove that chunk.
+   *
+   * @param input Input string.
+   * @return Final file name.
+   */
+  @VisibleForTesting
+  public static String extractTempFileName(final String input) {
+    StringBuilder sb = new StringBuilder();
+    Matcher matcher = TEMP_FILE_PATTERN.matcher(input);
+    if (matcher.find()) {
+      for (int i=1; i <= matcher.groupCount(); i++) {
+        String match = matcher.group(i);
+        if (match != null) {
+          sb.append(match);
+        }
+      }
+    }
+    if (sb.length() > 0) {
+      String ret = sb.toString();
+      LOG.debug("Extracted {} from {}", ret, input);
+      return ret;
+    }
+    return input;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
new file mode 100644
index 0000000..3508eab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/LocalResolver.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The local subcluster (where the writer is) should be tried first. The writer
+ * is defined from the RPC query received in the RPC server.
+ */
+public class LocalResolver implements OrderedResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LocalResolver.class);
+
+  /** Configuration key to set the minimum time to update the local cache.*/
+  public static final String MIN_UPDATE_PERIOD_KEY =
+      DFSConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
+  /** 10 seconds by default. */
+  private static final long MIN_UPDATE_PERIOD_DEFAULT =
+      TimeUnit.SECONDS.toMillis(10);
+
+
+  /** Router service. */
+  private final Router router;
+  /** Minimum update time. */
+  private final long minUpdateTime;
+
+  /** Node IP -> Subcluster. */
+  private Map<String, String> nodeSubcluster = null;
+  /** Last time the subcluster map was updated. */
+  private long lastUpdated;
+
+
+  public LocalResolver(final Configuration conf, final Router routerService) {
+    this.minUpdateTime = conf.getTimeDuration(
+        MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.router = routerService;
+  }
+
+  /**
+   * Get the local name space. This relies on the RPC Server to get the address
+   * from the client.
+   *
+   * TODO we only support DN and NN locations, we need to add others like
+   * Resource Managers.
+   *
+   * @param path Path ignored by this policy.
+   * @param loc Federated location with multiple destinations.
+   * @return Local name space. Null if we don't know about this machine.
+   */
+  @Override
+  public String getFirstNamespace(final String path, final PathLocation loc) {
+    String localSubcluster = null;
+    String clientAddr = getClientAddr();
+    Map<String, String> nodeToSubcluster = getSubclusterMappings();
+    if (nodeToSubcluster != null) {
+      localSubcluster = nodeToSubcluster.get(clientAddr);
+      if (localSubcluster != null) {
+        LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
+      } else {
+        LOG.error("Cannot get local namespace for {}", clientAddr);
+      }
+    } else {
+      LOG.error("Cannot get node mapping when resolving {} at {} from {}",
+          path, loc, clientAddr);
+    }
+    return localSubcluster;
+  }
+
+  @VisibleForTesting
+  String getClientAddr() {
+    return Server.getRemoteAddress();
+  }
+
+  /**
+   * Get the mapping from nodes to subcluster. It gets this mapping from the
+   * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
+   * too many calls. The cache might be updated asynchronously to reduce
+   * latency.
+   *
+   * @return Node IP -> Subcluster.
+   */
+  @VisibleForTesting
+  synchronized Map<String, String> getSubclusterMappings() {
+    if (nodeSubcluster == null ||
+        (monotonicNow() - lastUpdated) > minUpdateTime) {
+      // Fetch the mapping asynchronously
+      Thread updater = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          Map<String, String> mapping = new HashMap<>();
+
+          Map<String, String> dnSubcluster = getDatanodesSubcluster();
+          if (dnSubcluster != null) {
+            mapping.putAll(dnSubcluster);
+          }
+
+          Map<String, String> nnSubcluster = getNamenodesSubcluster();
+          if (nnSubcluster != null) {
+            mapping.putAll(nnSubcluster);
+          }
+          nodeSubcluster = mapping;
+          lastUpdated = monotonicNow();
+        }
+      });
+      updater.start();
+
+      // Wait until initialized
+      if (nodeSubcluster == null) {
+        try {
+          LOG.debug("Wait to get the mapping for the first time");
+          updater.join();
+        } catch (InterruptedException e) {
+          LOG.error("Cannot wait for the updater to finish");
+        }
+      }
+    }
+    return nodeSubcluster;
+  }
+
+  /**
+   * Get the Datanode mapping from the subclusters from the Namenodes. This
+   * needs to be done as a privileged action to use the user for the Router and
+   * not the one from the client in the RPC call.
+   *
+   * @return DN IP -> Subcluster.
+   */
+  private Map<String, String> getDatanodesSubcluster() {
+
+    final RouterRpcServer rpcServer = getRpcServer();
+    if (rpcServer == null) {
+      LOG.error("Cannot access the Router RPC server");
+      return null;
+    }
+
+    Map<String, String> ret = new HashMap<>();
+    try {
+      // We need to get the DNs as a privileged user
+      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+      Map<String, DatanodeStorageReport[]> dnMap = loginUser.doAs(
+          new PrivilegedAction<Map<String, DatanodeStorageReport[]>>() {
+            @Override
+            public Map<String, DatanodeStorageReport[]> run() {
+              try {
+                return rpcServer.getDatanodeStorageReportMap(
+                    DatanodeReportType.ALL);
+              } catch (IOException e) {
+                LOG.error("Cannot get the datanodes from the RPC server", e);
+                return null;
+              }
+            }
+          });
+      for (Entry<String, DatanodeStorageReport[]> entry : dnMap.entrySet()) {
+        String nsId = entry.getKey();
+        DatanodeStorageReport[] dns = entry.getValue();
+        for (DatanodeStorageReport dn : dns) {
+          DatanodeInfo dnInfo = dn.getDatanodeInfo();
+          String ipAddr = dnInfo.getIpAddr();
+          ret.put(ipAddr, nsId);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot get Datanodes from the Namenodes: {}", e.getMessage());
+    }
+    return ret;
+  }
+
+  /**
+   * Get the Namenode mapping from the subclusters from the Membership store. As
+   * the Routers are usually co-located with Namenodes, we also check for the
+   * local address for this Router here.
+   *
+   * @return NN IP -> Subcluster.
+   */
+  private Map<String, String> getNamenodesSubcluster() {
+
+    final MembershipStore membershipStore = getMembershipStore();
+    if (membershipStore == null) {
+      LOG.error("Cannot access the Membership store");
+      return null;
+    }
+
+    // Manage requests from this hostname (127.0.0.1)
+    String localIp = "127.0.0.1";
+    String localHostname = localIp;
+    try {
+      localHostname = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Cannot get local host name");
+    }
+
+    Map<String, String> ret = new HashMap<>();
+    try {
+      // Get the values from the store
+      GetNamenodeRegistrationsRequest request =
+          GetNamenodeRegistrationsRequest.newInstance();
+      GetNamenodeRegistrationsResponse response =
+          membershipStore.getNamenodeRegistrations(request);
+      final List<MembershipState> nns = response.getNamenodeMemberships();
+      for (MembershipState nn : nns) {
+        try {
+          String nsId = nn.getNameserviceId();
+          String rpcAddress = nn.getRpcAddress();
+          String hostname = HostAndPort.fromString(rpcAddress).getHostText();
+          ret.put(hostname, nsId);
+          if (hostname.equals(localHostname)) {
+            ret.put(localIp, nsId);
+          }
+
+          InetAddress addr = InetAddress.getByName(hostname);
+          String ipAddr = addr.getHostAddress();
+          ret.put(ipAddr, nsId);
+        } catch (Exception e) {
+          LOG.error("Cannot get address for {}: {}", nn, e.getMessage());
+        }
+      }
+    } catch (IOException ioe) {
+      LOG.error("Cannot get Namenodes from the State Store: {}",
+          ioe.getMessage());
+    }
+    return ret;
+  }
+
+  /**
+   * Get the Router RPC server.
+   *
+   * @return Router RPC server. Null if not possible.
+   */
+  private RouterRpcServer getRpcServer() {
+    if (this.router == null) {
+      return null;
+    }
+    return router.getRpcServer();
+  }
+
+  /**
+   * Get the Membership store.
+   *
+   * @return Membership store.
+   */
+  private MembershipStore getMembershipStore() {
+    StateStoreService stateStore = router.getStateStore();
+    if (stateStore == null) {
+      return null;
+    }
+    return stateStore.getRegisteredRecordStore(MembershipStore.class);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java
new file mode 100644
index 0000000..3a3ccf7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/OrderedResolver.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+
+
+/**
+ * Policy that decides which should be the first location accessed given
+ * multiple destinations.
+ */
+public interface OrderedResolver {
+
+  /**
+   * Get the first namespace based on this resolver approach.
+   *
+   * @param path Path to check.
+   * @param loc Federated location with multiple destinations.
+   * @return First namespace out of the locations.
+   */
+  String getFirstNamespace(String path, PathLocation loc);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java
new file mode 100644
index 0000000..022aa48
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/RandomResolver.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Order the destinations randomly.
+ */
+public class RandomResolver implements OrderedResolver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RandomResolver.class);
+
+
+  /** Random number generator. */
+  private static final Random RANDOM = new Random();
+
+  /**
+   * Get a random name space from the path.
+   *
+   * @param path Path ignored by this policy.
+   * @param loc Federated location with multiple destinations.
+   * @return Random name space.
+   */
+  public String getFirstNamespace(final String path, final PathLocation loc) {
+    if (loc == null) {
+      return null;
+    }
+    Set<String> namespaces = loc.getNamespaces();
+    if (namespaces == null || namespaces.isEmpty()) {
+      LOG.error("Cannot get namespaces for {}", loc);
+      return null;
+    }
+    List<String> nssList = new ArrayList<>(namespaces);
+    int index = RANDOM.nextInt(nssList.size());
+    return nssList.get(index);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index e0dfeb4..d282a7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1204,31 +1204,56 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
       DatanodeReportType type) throws IOException {
     checkOperation(OperationCategory.UNCHECKED);
 
-    Map<String, DatanodeStorageReport> datanodesMap = new HashMap<>();
-    RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
-        new Class<?>[] {DatanodeReportType.class}, type);
-    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, DatanodeStorageReport[]> results =
-        rpcClient.invokeConcurrent(
-            nss, method, true, false, DatanodeStorageReport[].class);
-    for (DatanodeStorageReport[] result : results.values()) {
-      for (DatanodeStorageReport node : result) {
-        String nodeId = node.getDatanodeInfo().getXferAddr();
+    Map<String, DatanodeStorageReport[]> dnSubcluster =
+        getDatanodeStorageReportMap(type);
+
+    // Avoid repeating machines in multiple subclusters
+    Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
+    for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
+      for (DatanodeStorageReport dn : dns) {
+        DatanodeInfo dnInfo = dn.getDatanodeInfo();
+        String nodeId = dnInfo.getXferAddr();
         if (!datanodesMap.containsKey(nodeId)) {
-          datanodesMap.put(nodeId, node);
+          datanodesMap.put(nodeId, dn);
         }
         // TODO merge somehow, right now it just takes the first one
       }
     }
 
     Collection<DatanodeStorageReport> datanodes = datanodesMap.values();
-    // TODO sort somehow
     DatanodeStorageReport[] combinedData =
         new DatanodeStorageReport[datanodes.size()];
     combinedData = datanodes.toArray(combinedData);
     return combinedData;
   }
 
+  /**
+   * Get the list of datanodes per subcluster.
+   *
+   * @param type Type of the datanodes to get.
+   * @return nsId -> datanode list.
+   * @throws IOException
+   */
+  public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
+      DatanodeReportType type) throws IOException {
+
+    Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
+    RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
+        new Class<?>[] {DatanodeReportType.class}, type);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, DatanodeStorageReport[]> results =
+        rpcClient.invokeConcurrent(
+            nss, method, true, false, DatanodeStorageReport[].class);
+    for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
+        results.entrySet()) {
+      FederationNamespaceInfo ns = entry.getKey();
+      String nsId = ns.getNameserviceId();
+      DatanodeStorageReport[] result = entry.getValue();
+      ret.put(nsId, result);
+    }
+    return ret;
+  }
+
   @Override // ClientProtocol
   public boolean setSafeMode(SafeModeAction action, boolean isChecked)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 3beeca3..9667489 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -295,6 +295,8 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
       return DestinationOrder.LOCAL;
     case RANDOM:
       return DestinationOrder.RANDOM;
+    case HASH_ALL:
+      return DestinationOrder.HASH_ALL;
     default:
       return DestinationOrder.HASH;
     }
@@ -306,6 +308,8 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
       return DestOrder.LOCAL;
     case RANDOM:
       return DestOrder.RANDOM;
+    case HASH_ALL:
+      return DestOrder.HASH_ALL;
     default:
       return DestOrder.HASH;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java
new file mode 100644
index 0000000..89273db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/ConsistentHashRing.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.io.MD5Hash;
+
+/**
+ * Consistent hash ring to distribute items across nodes (locations). If we add
+ * or remove nodes, it minimizes the item migration.
+ */
+public class ConsistentHashRing {
+  private static final String SEPERATOR = "/";
+  private static final String VIRTUAL_NODE_FORMAT = "%s" + SEPERATOR + "%d";
+
+  /** Hash ring. */
+  private SortedMap<String, String> ring = new TreeMap<String, String>();
+  /** Entry -> num virtual nodes on ring. */
+  private Map<String, Integer> entryToVirtualNodes =
+      new HashMap<String, Integer>();
+
+  /** Synchronization. */
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  public ConsistentHashRing(Set<String> locations) {
+    for (String location : locations) {
+      addLocation(location);
+    }
+  }
+
+  /**
+   * Add entry to consistent hash ring.
+   *
+   * @param location Node to add to the ring.
+   */
+  public void addLocation(String location) {
+    addLocation(location, 100);
+  }
+
+  /**
+   * Add entry to consistent hash ring.
+   *
+   * @param location Node to add to the ring.
+   * @param numVirtualNodes Number of virtual nodes to add.
+   */
+  public void addLocation(String location, int numVirtualNodes) {
+    writeLock.lock();
+    try {
+      entryToVirtualNodes.put(location, numVirtualNodes);
+      for (int i = 0; i < numVirtualNodes; i++) {
+        String key = String.format(VIRTUAL_NODE_FORMAT, location, i);
+        String hash = getHash(key);
+        ring.put(hash, key);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Remove specified entry from hash ring.
+   *
+   * @param location Node to remove from the ring.
+   */
+  public void removeLocation(String location) {
+    writeLock.lock();
+    try {
+      Integer numVirtualNodes = entryToVirtualNodes.remove(location);
+      for (int i = 0; i < numVirtualNodes; i++) {
+        String key = String.format(VIRTUAL_NODE_FORMAT, location, i);
+        String hash = getHash(key);
+        ring.remove(hash);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Return location (owner) of specified item. Owner is the next
+   * entry on the hash ring (with a hash value > hash value of item).
+   * @param item Item to look for.
+   * @return The location of the item.
+   */
+  public String getLocation(String item) {
+    readLock.lock();
+    try {
+      if (ring.isEmpty()) {
+        return null;
+      }
+      String hash = getHash(item);
+      if (!ring.containsKey(hash)) {
+        SortedMap<String, String> tailMap = ring.tailMap(hash);
+        hash = tailMap.isEmpty() ? ring.firstKey() : tailMap.firstKey();
+      }
+      String virtualNode = ring.get(hash);
+      int index = virtualNode.lastIndexOf(SEPERATOR);
+      if (index >= 0) {
+        return virtualNode.substring(0, index);
+      } else {
+        return virtualNode;
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public String getHash(String key) {
+    return MD5Hash.digest(key).toString();
+  }
+
+  /**
+   * Get the locations in the ring.
+   * @return Set of locations in the ring.
+   */
+  public Set<String> getLocations() {
+    return entryToVirtualNodes.keySet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java
new file mode 100644
index 0000000..7149675
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Assorted utility classes and helpers for HDFS Federation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.utils;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index 8e9c7af..9dfd1b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -88,7 +88,8 @@ public class RouterAdmin extends Configured implements Tool {
   public void printUsage() {
     String usage = "Federation Admin Tools:\n"
         + "\t[-add <source> <nameservice> <destination> "
-        + "[-readonly] -owner <owner> -group <group> -mode <mode>]\n"
+        + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+        + "-owner <owner> -group <group> -mode <mode>]\n"
         + "\t[-rm <source>]\n"
         + "\t[-ls <path>]\n"
         + "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
index 2b285db..b0d6982 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto
@@ -129,6 +129,7 @@ message MountTableRecordProto {
     HASH = 0;
     LOCAL = 1;
     RANDOM = 2;
+    HASH_ALL = 3;
   }
   optional DestOrder destOrder = 6 [default = HASH];
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
index f530fe9..e38443e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java
@@ -497,9 +497,9 @@ public class TestMountTableResolver {
             map2);
     entries.add(entry2);
     mountTable.refreshEntries(entries);
-    assertEquals("1->/testlocationcache/",
+    assertEquals("1->/testlocationcache",
             mountTable.getDestinationForPath("/testlocationcache").toString());
-    assertEquals("2->/anothertestlocationcache/",
+    assertEquals("2->/anothertestlocationcache",
             mountTable.getDestinationForPath("/anothertestlocationcache")
                     .toString());
 
@@ -518,7 +518,7 @@ public class TestMountTableResolver {
     mountTable.refreshEntries(entries);
 
     // Ensure location cache update correctly
-    assertEquals("3->/testlocationcache/",
+    assertEquals("3->/testlocationcache",
             mountTable.getDestinationForPath("/testlocationcache").toString());
 
     // Cleanup before exit

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
new file mode 100644
index 0000000..3915c56
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMultipleDestinationResolver.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver;
+
+import static org.apache.hadoop.hdfs.server.federation.resolver.order.HashResolver.extractTempFileName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the multiple destination resolver.
+ */
+public class TestMultipleDestinationResolver {
+
+  private MultipleDestinationMountTableResolver resolver;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration conf = new Configuration();
+    resolver = new MultipleDestinationMountTableResolver(conf, null);
+
+    // We manually point /tmp to only subcluster0
+    Map<String, String> map1 = new HashMap<>();
+    map1.put("subcluster0", "/tmp");
+    resolver.addEntry(MountTable.newInstance("/tmp", map1));
+
+    // We manually point / to subcluster0,1,2 with default order (hash)
+    Map<String, String> mapDefault = new HashMap<>();
+    mapDefault.put("subcluster0", "/");
+    mapDefault.put("subcluster1", "/");
+    mapDefault.put("subcluster2", "/");
+    MountTable defaultEntry = MountTable.newInstance("/", mapDefault);
+    resolver.addEntry(defaultEntry);
+
+    // We manually point /hash to subcluster0,1,2 with hashing
+    Map<String, String> mapHash = new HashMap<>();
+    mapHash.put("subcluster0", "/hash");
+    mapHash.put("subcluster1", "/hash");
+    mapHash.put("subcluster2", "/hash");
+    MountTable hashEntry = MountTable.newInstance("/hash", mapHash);
+    hashEntry.setDestOrder(DestinationOrder.HASH);
+    resolver.addEntry(hashEntry);
+
+    // We manually point /hashall to subcluster0,1,2 with hashing (full tree)
+    Map<String, String> mapHashAll = new HashMap<>();
+    mapHashAll.put("subcluster0", "/hashall");
+    mapHashAll.put("subcluster1", "/hashall");
+    mapHashAll.put("subcluster2", "/hashall");
+    MountTable hashEntryAll = MountTable.newInstance("/hashall", mapHashAll);
+    hashEntryAll.setDestOrder(DestinationOrder.HASH_ALL);
+    resolver.addEntry(hashEntryAll);
+
+    // We point /local to subclusters 0, 1, 2 with the local order
+    Map<String, String> mapLocal = new HashMap<>();
+    mapLocal.put("subcluster0", "/local");
+    mapLocal.put("subcluster1", "/local");
+    mapLocal.put("subcluster2", "/local");
+    MountTable localEntry = MountTable.newInstance("/local", mapLocal);
+    localEntry.setDestOrder(DestinationOrder.LOCAL);
+    resolver.addEntry(localEntry);
+
+    // We point /random to subclusters 0, 1, 2 with the random order
+    Map<String, String> mapRandom = new HashMap<>();
+    mapRandom.put("subcluster0", "/random");
+    mapRandom.put("subcluster1", "/random");
+    mapRandom.put("subcluster2", "/random");
+    MountTable randomEntry = MountTable.newInstance("/random", mapRandom);
+    randomEntry.setDestOrder(DestinationOrder.RANDOM);
+    resolver.addEntry(randomEntry);
+
+    // Read only mount point
+    Map<String, String> mapReadOnly = new HashMap<>();
+    mapReadOnly.put("subcluster0", "/readonly");
+    mapReadOnly.put("subcluster1", "/readonly");
+    mapReadOnly.put("subcluster2", "/readonly");
+    MountTable readOnlyEntry = MountTable.newInstance("/readonly", mapReadOnly);
+    readOnlyEntry.setReadOnly(true);
+    resolver.addEntry(readOnlyEntry);
+  }
+
+  @Test
+  public void testHashEqualDistribution() throws IOException {
+    // First level
+    testEvenDistribution("/hash");
+    testEvenDistribution("/hash/folder0", false);
+
+    // All levels
+    testEvenDistribution("/hashall");
+    testEvenDistribution("/hashall/folder0");
+  }
+
+  @Test
+  public void testHashAll() throws IOException {
+    // Files should be spread across subclusters
+    PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
+    assertDest("subcluster0", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
+    assertDest("subcluster1", dest1);
+
+    // Files within folder should be spread across subclusters
+    PathLocation dest2 = resolver.getDestinationForPath("/hashall/folder0");
+    assertDest("subcluster2", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/hashall/folder0/file0.txt");
+    assertDest("subcluster1", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/hashall/folder0/file1.txt");
+    assertDest("subcluster0", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file0.txt");
+    assertDest("subcluster1", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file1.txt");
+    assertDest("subcluster1", dest6);
+    PathLocation dest7 = resolver.getDestinationForPath(
+        "/hashall/folder0/folder0/file2.txt");
+    assertDest("subcluster0", dest7);
+
+    PathLocation dest8 = resolver.getDestinationForPath("/hashall/folder1");
+    assertDest("subcluster1", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/hashall/folder1/file0.txt");
+    assertDest("subcluster0", dest9);
+    PathLocation dest10 = resolver.getDestinationForPath(
+        "/hashall/folder1/file1.txt");
+    assertDest("subcluster1", dest10);
+
+    PathLocation dest11 = resolver.getDestinationForPath("/hashall/folder2");
+    assertDest("subcluster2", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/hashall/folder2/file0.txt");
+    assertDest("subcluster0", dest12);
+    PathLocation dest13 = resolver.getDestinationForPath(
+        "/hashall/folder2/file1.txt");
+    assertDest("subcluster0", dest13);
+    PathLocation dest14 = resolver.getDestinationForPath(
+        "/hashall/folder2/file2.txt");
+    assertDest("subcluster1", dest14);
+  }
+
+  @Test
+  public void testHashFirst() throws IOException {
+    PathLocation dest0 = resolver.getDestinationForPath("/hashall/file0.txt");
+    assertDest("subcluster0", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/hashall/file1.txt");
+    assertDest("subcluster1", dest1);
+
+    // All these must be in the same location: subcluster0
+    PathLocation dest2 = resolver.getDestinationForPath("/hash/folder0");
+    assertDest("subcluster0", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/hash/folder0/file0.txt");
+    assertDest("subcluster0", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/hash/folder0/file1.txt");
+    assertDest("subcluster0", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/hash/folder0/folder0/file0.txt");
+    assertDest("subcluster0", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/hash/folder0/folder0/file1.txt");
+    assertDest("subcluster0", dest6);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest7 = resolver.getDestinationForPath("/hash/folder1");
+    assertDest("subcluster2", dest7);
+    PathLocation dest8 = resolver.getDestinationForPath(
+        "/hash/folder1/file0.txt");
+    assertDest("subcluster2", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/hash/folder1/file1.txt");
+    assertDest("subcluster2", dest9);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest10 = resolver.getDestinationForPath("/hash/folder2");
+    assertDest("subcluster2", dest10);
+    PathLocation dest11 = resolver.getDestinationForPath(
+        "/hash/folder2/file0.txt");
+    assertDest("subcluster2", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/hash/folder2/file1.txt");
+    assertDest("subcluster2", dest12);
+  }
+
+  @Test
+  public void testRandomEqualDistribution() throws IOException {
+    testEvenDistribution("/random");
+  }
+
+  @Test
+  public void testSingleDestination() throws IOException {
+    // All the files in /tmp should be in subcluster0
+    for (int f = 0; f < 100; f++) {
+      String filename = "/tmp/b/c/file" + f + ".txt";
+      PathLocation destination = resolver.getDestinationForPath(filename);
+      RemoteLocation loc = destination.getDefaultLocation();
+      assertEquals("subcluster0", loc.getNameserviceId());
+      assertEquals(filename, loc.getDest());
+    }
+  }
+
+  @Test
+  public void testResolveSubdirectories() throws Exception {
+    // Simulate a testdir under a multi-destination mount.
+    Random r = new Random();
+    String testDir = "/sort/testdir" + r.nextInt();
+    String file1 = testDir + "/file1" + r.nextInt();
+    String file2 = testDir + "/file2" + r.nextInt();
+
+    // Verify both files resolve to the same namespace as the parent dir.
+    PathLocation testDirLocation = resolver.getDestinationForPath(testDir);
+    RemoteLocation defaultLoc = testDirLocation.getDefaultLocation();
+    String testDirNamespace = defaultLoc.getNameserviceId();
+
+    PathLocation file1Location = resolver.getDestinationForPath(file1);
+    RemoteLocation defaultLoc1 = file1Location.getDefaultLocation();
+    assertEquals(testDirNamespace, defaultLoc1.getNameserviceId());
+
+    PathLocation file2Location = resolver.getDestinationForPath(file2);
+    RemoteLocation defaultLoc2 = file2Location.getDefaultLocation();
+    assertEquals(testDirNamespace, defaultLoc2.getNameserviceId());
+  }
+
+  @Test
+  public void testExtractTempFileName() {
+    for (String teststring : new String[] {
+        "testfile1.txt.COPYING",
+        "testfile1.txt._COPYING_",
+        "testfile1.txt._COPYING_.attempt_1486662804109_0055_m_000042_0",
+        "testfile1.txt.tmp",
+        "_temp/testfile1.txt",
+        "_temporary/testfile1.txt.af77e2ab-4bc5-4959-ae08-299c880ee6b8",
+        "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
+          "testfile1.txt" }) {
+      String finalName = extractTempFileName(teststring);
+      assertEquals("testfile1.txt", finalName);
+    }
+
+    // False cases
+    assertEquals(
+        "file1.txt.COPYING1", extractTempFileName("file1.txt.COPYING1"));
+    assertEquals("file1.txt.tmp2", extractTempFileName("file1.txt.tmp2"));
+
+    // Speculation patterns
+    String finalName = extractTempFileName(
+        "_temporary/part-00007.af77e2ab-4bc5-4959-ae08-299c880ee6b8");
+    assertEquals("part-00007", finalName);
+    finalName = extractTempFileName(
+        "_temporary/0/_temporary/attempt_201706281636_0007_m_000003_46/" +
+          "part-00003");
+    assertEquals("part-00003", finalName);
+
+    // Subfolders
+    finalName = extractTempFileName("folder0/testfile1.txt._COPYING_");
+    assertEquals("folder0/testfile1.txt", finalName);
+    finalName = extractTempFileName(
+        "folder0/folder1/testfile1.txt._COPYING_");
+    assertEquals("folder0/folder1/testfile1.txt", finalName);
+    finalName = extractTempFileName(
+        "processedHrsData.txt/_temporary/0/_temporary/" +
+        "attempt_201706281636_0007_m_000003_46/part-00003");
+    assertEquals("processedHrsData.txt/part-00003", finalName);
+  }
+
+  @Test
+  public void testReadOnly() throws IOException {
+    MountTable mount = resolver.getMountPoint("/readonly");
+    assertTrue(mount.isReadOnly());
+
+    PathLocation dest0 = resolver.getDestinationForPath("/readonly/file0.txt");
+    assertDest("subcluster1", dest0);
+    PathLocation dest1 = resolver.getDestinationForPath("/readonly/file1.txt");
+    assertDest("subcluster2", dest1);
+
+    // All these must be in the same location: subcluster0
+    PathLocation dest2 = resolver.getDestinationForPath("/readonly/folder0");
+    assertDest("subcluster1", dest2);
+    PathLocation dest3 = resolver.getDestinationForPath(
+        "/readonly/folder0/file0.txt");
+    assertDest("subcluster1", dest3);
+    PathLocation dest4 = resolver.getDestinationForPath(
+        "/readonly/folder0/file1.txt");
+    assertDest("subcluster1", dest4);
+
+    PathLocation dest5 = resolver.getDestinationForPath(
+        "/readonly/folder0/folder0/file0.txt");
+    assertDest("subcluster1", dest5);
+    PathLocation dest6 = resolver.getDestinationForPath(
+        "/readonly/folder0/folder0/file1.txt");
+    assertDest("subcluster1", dest6);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest7 = resolver.getDestinationForPath("/readonly/folder1");
+    assertDest("subcluster2", dest7);
+    PathLocation dest8 = resolver.getDestinationForPath(
+        "/readonly/folder1/file0.txt");
+    assertDest("subcluster2", dest8);
+    PathLocation dest9 = resolver.getDestinationForPath(
+        "/readonly/folder1/file1.txt");
+    assertDest("subcluster2", dest9);
+
+    // All these must be in the same location: subcluster2
+    PathLocation dest10 = resolver.getDestinationForPath("/readonly/folder2");
+    assertDest("subcluster1", dest10);
+    PathLocation dest11 = resolver.getDestinationForPath(
+        "/readonly/folder2/file0.txt");
+    assertDest("subcluster1", dest11);
+    PathLocation dest12 = resolver.getDestinationForPath(
+        "/readonly/folder2/file1.txt");
+    assertDest("subcluster1", dest12);
+  }
+
+  @Test
+  public void testLocalResolver() throws IOException {
+    PathLocation dest0 =
+        resolver.getDestinationForPath("/local/folder0/file0.txt");
+    assertDest("subcluster0", dest0);
+  }
+
+  @Test
+  public void testRandomResolver() throws IOException {
+    Set<String> destinations = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      PathLocation dest =
+          resolver.getDestinationForPath("/random/folder0/file0.txt");
+      RemoteLocation firstDest = dest.getDestinations().get(0);
+      String nsId = firstDest.getNameserviceId();
+      destinations.add(nsId);
+    }
+    assertEquals(3, destinations.size());
+  }
+
+  /**
+   * Test that a path has files distributed across destinations evenly.
+   * @param path Path to check.
+   * @throws IOException
+   */
+  private void testEvenDistribution(final String path) throws IOException {
+    testEvenDistribution(path, true);
+  }
+
+  /**
+   * Test that a path has files distributed across destinations evenly or not.
+   * @param path Path to check.
+   * @param even If the distribution should be even or not.
+   * @throws IOException If it cannot check it.
+   */
+  private void testEvenDistribution(final String path, final boolean even)
+      throws IOException {
+
+    // Subcluster -> Files
+    Map<String, Set<String>> results = new HashMap<>();
+    for (int f = 0; f < 10000; f++) {
+      String filename = path + "/file" + f + ".txt";
+      PathLocation destination = resolver.getDestinationForPath(filename);
+      RemoteLocation loc = destination.getDefaultLocation();
+      assertEquals(filename, loc.getDest());
+
+      String nsId = loc.getNameserviceId();
+      if (!results.containsKey(nsId)) {
+        results.put(nsId, new TreeSet<>());
+      }
+      results.get(nsId).add(filename);
+    }
+
+    if (!even) {
+      // All files should be in one subcluster
+      assertEquals(1, results.size());
+    } else {
+      // Files should be distributed somewhat evenly
+      assertEquals(3, results.size());
+      int count = 0;
+      for (Set<String> files : results.values()) {
+        count = count + files.size();
+      }
+      int avg = count / results.keySet().size();
+      for (Set<String> files : results.values()) {
+        int filesCount = files.size();
+        // Check that the count in each namespace is within 20% of avg
+        assertTrue(filesCount > 0);
+        assertTrue(Math.abs(filesCount - avg) < (avg / 5));
+      }
+    }
+  }
+
+  private static void assertDest(String expectedDest, PathLocation loc) {
+    assertEquals(expectedDest, loc.getDestinations().get(0).getNameserviceId());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
new file mode 100644
index 0000000..42ede62
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/order/TestLocalResolver.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.resolver.order;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test the {@link LocalResolver}.
+ */
+public class TestLocalResolver {
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLocalResolver() throws IOException {
+
+    // Mock the subcluster mapping
+    Configuration conf = new Configuration();
+    Router router = mock(Router.class);
+    StateStoreService stateStore = mock(StateStoreService.class);
+    MembershipStore membership = mock(MembershipStore.class);
+    when(router.getStateStore()).thenReturn(stateStore);
+    when(stateStore.getRegisteredRecordStore(any(Class.class)))
+        .thenReturn(membership);
+    GetNamenodeRegistrationsResponse response =
+        GetNamenodeRegistrationsResponse.newInstance();
+    // Set the mapping for each client
+    List<MembershipState> records = new LinkedList<>();
+    records.add(newMembershipState("client0", "subcluster0"));
+    records.add(newMembershipState("client1", "subcluster1"));
+    records.add(newMembershipState("client2", "subcluster2"));
+    response.setNamenodeMemberships(records);
+    when(membership.getNamenodeRegistrations(
+        any(GetNamenodeRegistrationsRequest.class))).thenReturn(response);
+
+    // Mock the client resolution: it will be anything in sb
+    StringBuilder sb = new StringBuilder("clientX");
+    LocalResolver localResolver = new LocalResolver(conf, router);
+    LocalResolver spyLocalResolver = spy(localResolver);
+    doAnswer(new Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocation) throws Throwable {
+        return sb.toString();
+      }
+    }).when(spyLocalResolver).getClientAddr();
+
+    // Add the mocks to the resolver
+    MultipleDestinationMountTableResolver resolver =
+        new MultipleDestinationMountTableResolver(conf, router);
+    resolver.addResolver(DestinationOrder.LOCAL, spyLocalResolver);
+
+
+    // We point /local to subclusters 0, 1, 2 with the local order
+    Map<String, String> mapLocal = new HashMap<>();
+    mapLocal.put("subcluster0", "/local");
+    mapLocal.put("subcluster1", "/local");
+    mapLocal.put("subcluster2", "/local");
+    MountTable localEntry = MountTable.newInstance("/local", mapLocal);
+    localEntry.setDestOrder(DestinationOrder.LOCAL);
+    resolver.addEntry(localEntry);
+
+    // Test first with the default destination
+    PathLocation dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster0", dest);
+
+    // We change the client location and verify
+    setClient(sb, "client2");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster2", dest);
+
+    setClient(sb, "client1");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster1", dest);
+
+    setClient(sb, "client0");
+    dest = resolver.getDestinationForPath("/local/file0.txt");
+    assertDestination("subcluster0", dest);
+  }
+
+  private void assertDestination(String expectedNsId, PathLocation loc) {
+    List<RemoteLocation> dests = loc.getDestinations();
+    RemoteLocation dest = dests.get(0);
+    assertEquals(expectedNsId, dest.getNameserviceId());
+  }
+
+  private MembershipState newMembershipState(String addr, String nsId) {
+    return MembershipState.newInstance(
+        "routerId", nsId, "nn0", "cluster0", "blockPool0",
+        addr + ":8001", addr + ":8002", addr + ":8003", addr + ":8004",
+        FederationNamenodeServiceState.ACTIVE, false);
+  }
+
+  /**
+   * Set the address of the client issuing the request. We use a StringBuilder
+   * to modify the value in place for the mock.
+   * @param sb StringBuilder to set the client string.
+   * @param client Address of the client.
+   */
+  private static void setClient(StringBuilder sb, String client) {
+    sb.replace(0, sb.length(), client);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
index 3271d56..a8ffded 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntr
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.util.Time;
@@ -172,6 +174,52 @@ public class TestRouterAdmin {
     MountTable record = getMountTableEntry("/readonly");
     assertEquals("/readonly", record.getSourcePath());
     assertTrue(record.isReadOnly());
+
+    // Removing the new entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance("/readonly");
+    RemoveMountTableEntryResponse removeResponse =
+        mountTable.removeMountTableEntry(removeRequest);
+    assertTrue(removeResponse.getStatus());
+  }
+
+  @Test
+  public void testAddOrderMountTable() throws IOException {
+    testAddOrderMountTable(DestinationOrder.HASH);
+    testAddOrderMountTable(DestinationOrder.LOCAL);
+    testAddOrderMountTable(DestinationOrder.RANDOM);
+    testAddOrderMountTable(DestinationOrder.HASH_ALL);
+  }
+
+  private void testAddOrderMountTable(final DestinationOrder order)
+      throws IOException {
+    final String mnt = "/" + order;
+    MountTable newEntry = MountTable.newInstance(
+        mnt, Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+    newEntry.setDestOrder(order);
+
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTable = client.getMountTableManager();
+
+    // Add
+    AddMountTableEntryRequest addRequest;
+    AddMountTableEntryResponse addResponse;
+    addRequest = AddMountTableEntryRequest.newInstance(newEntry);
+    addResponse = mountTable.addMountTableEntry(addRequest);
+    assertTrue(addResponse.getStatus());
+
+    // Check that we have the read only entry
+    MountTable record = getMountTableEntry(mnt);
+    assertEquals(mnt, record.getSourcePath());
+    assertEquals(order, record.getDestOrder());
+
+    // Removing the new entry
+    RemoveMountTableEntryRequest removeRequest =
+        RemoveMountTableEntryRequest.newInstance(mnt);
+    RemoveMountTableEntryResponse removeResponse =
+        mountTable.removeMountTableEntry(removeRequest);
+    assertTrue(removeResponse.getStatus());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13f2ee05/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index 161e613..20353c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
+
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -32,7 +33,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
 import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
 import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@@ -48,6 +51,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.base.Supplier;
+
 /**
  * Tests Router admin commands.
  */
@@ -142,6 +146,36 @@ public class TestRouterAdminCLI {
   }
 
   @Test
+  public void testAddOrderMountTable() throws Exception {
+    testAddOrderMountTable(DestinationOrder.HASH);
+    testAddOrderMountTable(DestinationOrder.LOCAL);
+    testAddOrderMountTable(DestinationOrder.RANDOM);
+    testAddOrderMountTable(DestinationOrder.HASH_ALL);
+  }
+
+  private void testAddOrderMountTable(DestinationOrder order)
+      throws Exception {
+    final String mnt = "/" + order;
+    final String nsId = "ns0,ns1";
+    final String dest = "/";
+    String[] argv = new String[] {
+        "-add", mnt, nsId, dest, "-order", order.toString()};
+    assertEquals(0, ToolRunner.run(admin, argv));
+
+    // Check the state in the State Store
+    stateStore.loadCache(MountTableStoreImpl.class, true);
+    MountTableManager mountTable = client.getMountTableManager();
+    GetMountTableEntriesRequest request =
+        GetMountTableEntriesRequest.newInstance(mnt);
+    GetMountTableEntriesResponse response =
+        mountTable.getMountTableEntries(request);
+    List<MountTable> entries = response.getEntries();
+    assertEquals(1, entries.size());
+    assertEquals(2, entries.get(0).getDestinations().size());
+    assertEquals(order, response.getEntries().get(0).getDestOrder());
+  }
+
+  @Test
   public void testListMountTable() throws Exception {
     String nsId = "ns0";
     String src = "/test-lsmounttable";


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message