hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject hadoop git commit: HDFS-13536. [PROVIDED Storage] HA for InMemoryAliasMap. Contributed by Virajith Jalaparti.
Date Mon, 02 Jul 2018 17:48:24 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 5cc2541a1 -> 1804a3151


HDFS-13536. [PROVIDED Storage] HA for InMemoryAliasMap. Contributed by Virajith Jalaparti.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1804a315
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1804a315
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1804a315

Branch: refs/heads/trunk
Commit: 1804a31515e541b3371925aa895589919b54d443
Parents: 5cc2541
Author: Inigo Goiri <inigoiri@apache.org>
Authored: Mon Jul 2 10:48:20 2018 -0700
Committer: Inigo Goiri <inigoiri@apache.org>
Committed: Mon Jul 2 10:48:20 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |   4 +-
 .../hdfs/client/HdfsClientConfigKeys.java       |   3 +
 .../ha/ConfiguredFailoverProxyProvider.java     |   9 +-
 .../InMemoryAliasMapFailoverProxyProvider.java  |  38 ++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  37 +-
 .../org/apache/hadoop/hdfs/NameNodeProxies.java |  15 +-
 ...yAliasMapProtocolClientSideTranslatorPB.java |  95 ++++-
 .../aliasmap/InMemoryAliasMapProtocol.java      |   5 +
 .../aliasmap/InMemoryLevelDBAliasMapServer.java |  19 +-
 .../impl/InMemoryLevelDBAliasMapClient.java     |  80 ++--
 .../src/main/resources/hdfs-default.xml         |  22 +-
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |  13 +-
 .../apache/hadoop/hdfs/MiniDFSNNTopology.java   |   2 +-
 .../impl/TestInMemoryLevelDBAliasMapClient.java |   7 +
 .../namenode/ITestProvidedImplementation.java   | 371 ++++++++++++++++---
 16 files changed, 615 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 313b973..3fac7c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -396,7 +396,7 @@ public class DFSUtilClient {
    * @param keys Set of keys to look for in the order of preference
    * @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
    */
-  static Map<String, Map<String, InetSocketAddress>> getAddresses(
+  public static Map<String, Map<String, InetSocketAddress>> getAddresses(
       Configuration conf, String defaultAddress, String... keys) {
     Collection<String> nameserviceIds = getNameServiceIds(conf);
     return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
@@ -426,7 +426,7 @@ public class DFSUtilClient {
     return ret;
   }
 
-  static Map<String, InetSocketAddress> getAddressesForNameserviceId(
+  public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
       Configuration conf, String nsId, String defaultValue, String... keys) {
     Collection<String> nnIds = getNameNodeIds(conf, nsId);
     Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index f2cec31..a812670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -184,6 +184,9 @@ public interface HdfsClientConfigKeys {
       "dfs.namenode.snapshot.capture.openfiles";
   boolean DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES_DEFAULT = false;
 
+  String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
+      "dfs.provided.aliasmap.inmemory.dnrpc-address";
+
   /**
    * These are deprecated config keys to client code.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
index 96722fc..f46532a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+
 /**
  * A FailoverProxyProvider implementation which allows one to configure
  * multiple URIs to connect to during fail-over. A random configured address is
@@ -60,6 +62,11 @@ public class ConfiguredFailoverProxyProvider<T> extends
 
   public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
       Class<T> xface, HAProxyFactory<T> factory) {
+    this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY);
+  }
+
+  public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
+      Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
     this.xface = xface;
     this.conf = new Configuration(conf);
     int maxRetries = this.conf.getInt(
@@ -81,7 +88,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
       ugi = UserGroupInformation.getCurrentUser();
 
       Map<String, Map<String, InetSocketAddress>> map =
-          DFSUtilClient.getHaNnRpcAddresses(conf);
+          DFSUtilClient.getAddresses(conf, null, addressKey);
       Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
 
       if (addressesInNN == null || addressesInNN.size() == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java
new file mode 100644
index 0000000..6525942
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/InMemoryAliasMapFailoverProxyProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URI;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
+
+/**
+ * A {@link ConfiguredFailoverProxyProvider} implementation used to connect
+ * to an InMemoryAliasMap.
+ */
+public class InMemoryAliasMapFailoverProxyProvider<T>
+    extends ConfiguredFailoverProxyProvider<T> {
+
+  public InMemoryAliasMapFailoverProxyProvider(
+      Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
+    super(conf, uri, xface, factory,
+        DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index dde7eb7..cc902b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -86,8 +86,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50105";
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
-  public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS = "dfs.provided.aliasmap.inmemory.dnrpc-address";
+  public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS =
+      HdfsClientConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
   public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT = "0.0.0.0:50200";
+  public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST = "dfs.provided.aliasmap.inmemory.rpc.bind-host";
+
   public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR = "dfs.provided.aliasmap.inmemory.leveldb.dir";
   public static final String DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE = "dfs.provided.aliasmap.inmemory.batch-size";
   public static final int DFS_PROVIDED_ALIASMAP_INMEMORY_BATCH_SIZE_DEFAULT = 500;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 4c94e38..f7cd32b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1130,7 +1130,42 @@ public class DFSUtil {
     
     return getSuffixIDs(conf, addressKey, null, nnId, LOCAL_ADDRESS_MATCHER)[0];
   }
-  
+
+  /**
+   * Determine the {@link InetSocketAddress} to bind to, for any service.
+   * In case of HA or federation, the address is assumed to specified as
+   * {@code confKey}.NAMESPACEID.NAMENODEID as appropriate.
+   *
+   * @param conf configuration.
+   * @param confKey configuration key (prefix if HA/federation) used to
+   *        specify the address for the service.
+   * @param defaultValue default value for the address.
+   * @param bindHostKey configuration key (prefix if HA/federation)
+   *        specifying host to bind to.
+   * @return the address to bind to.
+   */
+  public static InetSocketAddress getBindAddress(Configuration conf,
+      String confKey, String defaultValue, String bindHostKey) {
+    InetSocketAddress address;
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    String bindHostActualKey;
+    if (nsId != null) {
+      String namenodeId = HAUtil.getNameNodeId(conf, nsId);
+      address = DFSUtilClient.getAddressesForNameserviceId(
+          conf, nsId, null, confKey).get(namenodeId);
+      bindHostActualKey = DFSUtil.addKeySuffixes(bindHostKey, nsId, namenodeId);
+    } else {
+      address = NetUtils.createSocketAddr(conf.get(confKey, defaultValue));
+      bindHostActualKey = bindHostKey;
+    }
+
+    String bindHost = conf.get(bindHostActualKey);
+    if (bindHost == null || bindHost.isEmpty()) {
+      bindHost = address.getHostName();
+    }
+    return new InetSocketAddress(bindHost, address.getPort());
+  }
+
   /**
    * Returns nameservice Id and namenode Id when the local host matches the
    * configuration parameter {@code addressKey}.<nameservice Id>.<namenode Id>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index d556c90..b63d26b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -31,10 +31,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
 import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.namenode.ha.NameNodeHAProxyFactory;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
@@ -184,6 +187,8 @@ public class NameNodeProxies {
           conf, ugi);
     } else if (xface == RefreshCallQueueProtocol.class) {
       proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
+    } else if (xface == InMemoryAliasMapProtocol.class) {
+      proxy = (T) createNNProxyWithInMemoryAliasMapProtocol(nnAddr, conf, ugi);
     } else {
       String message = "Unsupported protocol found when creating the proxy " +
           "connection to NameNode: " +
@@ -194,7 +199,15 @@ public class NameNodeProxies {
 
     return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
   }
-  
+
+  private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
+      throws IOException {
+    AliasMapProtocolPB proxy = (AliasMapProtocolPB) createNameNodeProxy(
+        address, conf, ugi, AliasMapProtocolPB.class, 30000);
+    return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy);
+  }
+
   private static JournalProtocol createNNProxyWithJournalProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
index fc23c88..2025c16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
@@ -20,27 +20,38 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
 import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider;
 import org.apache.hadoop.ipc.ProtobufHelper;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import java.io.Closeable;
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSUtil.addKeySuffixes;
+import static org.apache.hadoop.hdfs.DFSUtil.createUri;
+import static org.apache.hadoop.hdfs.DFSUtilClient.getNameServiceIds;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
 import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
 
@@ -52,7 +63,7 @@ import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InMemoryAliasMapProtocolClientSideTranslatorPB
-    implements InMemoryAliasMapProtocol {
+    implements InMemoryAliasMapProtocol, Closeable {
 
   private static final Logger LOG =
       LoggerFactory
@@ -60,22 +71,61 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
 
   private AliasMapProtocolPB rpcProxy;
 
-  public InMemoryAliasMapProtocolClientSideTranslatorPB(Configuration conf) {
-    String addr = conf.getTrimmed(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
-        DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
-    InetSocketAddress aliasMapAddr = NetUtils.createSocketAddr(addr);
+  public InMemoryAliasMapProtocolClientSideTranslatorPB(
+      AliasMapProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
 
-    RPC.setProtocolEngine(conf, AliasMapProtocolPB.class,
-        ProtobufRpcEngine.class);
-    LOG.info("Connecting to address: " + addr);
-    try {
-      rpcProxy = RPC.getProxy(AliasMapProtocolPB.class,
-          RPC.getProtocolVersion(AliasMapProtocolPB.class), aliasMapAddr, null,
-          conf, NetUtils.getDefaultSocketFactory(conf), 0);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Error in connecting to " + addr + " Got: " + e);
+  public static Collection<InMemoryAliasMapProtocol> init(Configuration conf) {
+    Collection<InMemoryAliasMapProtocol> aliasMaps = new ArrayList<>();
+    // Try to connect to all configured nameservices as it is not known which
+    // nameservice supports the AliasMap.
+    for (String nsId : getNameServiceIds(conf)) {
+      try {
+        URI namenodeURI = null;
+        Configuration newConf = new Configuration(conf);
+        if (HAUtil.isHAEnabled(conf, nsId)) {
+          // set the failover-proxy provider if HA is enabled.
+          newConf.setClass(
+              addKeySuffixes(PROXY_PROVIDER_KEY_PREFIX, nsId),
+              InMemoryAliasMapFailoverProxyProvider.class,
+              AbstractNNFailoverProxyProvider.class);
+          namenodeURI = new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + nsId);
+        } else {
+          String key =
+              addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, nsId);
+          String addr = conf.get(key);
+          if (addr != null) {
+            namenodeURI = createUri(HdfsConstants.HDFS_URI_SCHEME,
+                NetUtils.createSocketAddr(addr));
+          }
+        }
+        if (namenodeURI != null) {
+          aliasMaps.add(NameNodeProxies
+              .createProxy(newConf, namenodeURI, InMemoryAliasMapProtocol.class)
+              .getProxy());
+          LOG.info("Connected to InMemoryAliasMap at {}", namenodeURI);
+        }
+      } catch (IOException | URISyntaxException e) {
+        LOG.warn("Exception in connecting to InMemoryAliasMap for nameservice "
+            + "{}: {}", nsId, e);
+      }
     }
+    // if a separate AliasMap is configured using
+    // DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, try to connect it.
+    if (conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS) != null) {
+      URI uri = createUri("hdfs", NetUtils.createSocketAddr(
+          conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS)));
+      try {
+        aliasMaps.add(NameNodeProxies
+            .createProxy(conf, uri, InMemoryAliasMapProtocol.class).getProxy());
+        LOG.info("Connected to InMemoryAliasMap at {}", uri);
+      } catch (IOException e) {
+        LOG.warn("Exception in connecting to InMemoryAliasMap at {}: {}", uri,
+            e);
+      }
+    }
+    return aliasMaps;
   }
 
   @Override
@@ -168,7 +218,12 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
     }
   }
 
-  public void stop() {
-    RPC.stopProxy(rpcProxy);
+  @Override
+  public void close() throws IOException {
+    LOG.info("Stopping rpcProxy in" +
+        "InMemoryAliasMapProtocolClientSideTranslatorPB");
+    if (rpcProxy != null) {
+      RPC.stopProxy(rpcProxy);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
index 89f590c..c3824e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryAliasMapProtocol.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
 import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.io.retry.Idempotent;
 
 import javax.annotation.Nonnull;
 import java.io.IOException;
@@ -69,6 +70,7 @@ public interface InMemoryAliasMapProtocol {
    * FileRegions and the next marker.
    * @throws IOException
    */
+  @Idempotent
   InMemoryAliasMap.IterationResult list(Optional<Block> marker)
       throws IOException;
 
@@ -80,6 +82,7 @@ public interface InMemoryAliasMapProtocol {
    * @throws IOException
    */
   @Nonnull
+  @Idempotent
   Optional<ProvidedStorageLocation> read(@Nonnull Block block)
       throws IOException;
 
@@ -90,6 +93,7 @@ public interface InMemoryAliasMapProtocol {
    * @param providedStorageLocation
    * @throws IOException
    */
+  @Idempotent
   void write(@Nonnull Block block,
       @Nonnull ProvidedStorageLocation providedStorageLocation)
       throws IOException;
@@ -99,5 +103,6 @@ public interface InMemoryAliasMapProtocol {
    * @return the block pool id associated with the Namenode running
    * the in-memory alias map.
    */
+  @Idempotent
   String getBlockPoolId() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
index 4edc9a2..1d06f13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/aliasmap/InMemoryLevelDBAliasMapServer.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
 import org.apache.hadoop.hdfs.protocolPB.AliasMapProtocolPB;
@@ -34,9 +33,13 @@ import org.apache.hadoop.ipc.RPC;
 import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Optional;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST;
+import static org.apache.hadoop.hdfs.DFSUtil.getBindAddress;
 import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
 import static org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap.CheckedFunction2;
 
@@ -79,18 +82,16 @@ public class InMemoryLevelDBAliasMapServer implements InMemoryAliasMapProtocol,
         AliasMapProtocolService
             .newReflectiveBlockingService(aliasMapProtocolXlator);
 
-    String rpcAddress =
-        conf.get(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
-            DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT);
-    String[] split = rpcAddress.split(":");
-    String bindHost = split[0];
-    Integer port = Integer.valueOf(split[1]);
+    InetSocketAddress rpcAddress = getBindAddress(conf,
+        DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
+        DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS_DEFAULT,
+        DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST);
 
     aliasMapServer = new RPC.Builder(conf)
         .setProtocol(AliasMapProtocolPB.class)
         .setInstance(aliasMapProtocolService)
-        .setBindAddress(bindHost)
-        .setPort(port)
+        .setBindAddress(rpcAddress.getHostName())
+        .setPort(rpcAddress.getPort())
         .setNumHandlers(1)
         .setVerbose(true)
         .build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
index d389184..fb5ee93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/InMemoryLevelDBAliasMapClient.java
@@ -24,11 +24,17 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ProvidedStorageLocation;
 import org.apache.hadoop.hdfs.protocolPB.InMemoryAliasMapProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
+import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
 import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -44,17 +50,28 @@ import java.util.Optional;
 public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
     implements Configurable {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InMemoryLevelDBAliasMapClient.class);
   private Configuration conf;
-  private InMemoryAliasMapProtocolClientSideTranslatorPB aliasMap;
-  private String blockPoolID;
+  private Collection<InMemoryAliasMapProtocol> aliasMaps;
 
   @Override
   public void close() {
-    aliasMap.stop();
+    if (aliasMaps != null) {
+      for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
+        RPC.stopProxy(aliasMap);
+      }
+    }
   }
 
   class LevelDbReader extends BlockAliasMap.Reader<FileRegion> {
 
+    private InMemoryAliasMapProtocol aliasMap;
+
+    LevelDbReader(InMemoryAliasMapProtocol aliasMap) {
+      this.aliasMap = aliasMap;
+    }
+
     @Override
     public Optional<FileRegion> resolve(Block block) throws IOException {
       Optional<ProvidedStorageLocation> read = aliasMap.read(block);
@@ -114,6 +131,13 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
   }
 
   class LevelDbWriter extends BlockAliasMap.Writer<FileRegion> {
+
+    private InMemoryAliasMapProtocol aliasMap;
+
+    LevelDbWriter(InMemoryAliasMapProtocol aliasMap) {
+      this.aliasMap = aliasMap;
+    }
+
     @Override
     public void store(FileRegion fileRegion) throws IOException {
       aliasMap.write(fileRegion.getBlock(),
@@ -130,40 +154,53 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
       throw new UnsupportedOperationException("Unable to start "
           + "InMemoryLevelDBAliasMapClient as security is enabled");
     }
+    aliasMaps = new ArrayList<>();
   }
 
-
-  @Override
-  public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
+  private InMemoryAliasMapProtocol getAliasMap(String blockPoolID)
       throws IOException {
-    if (this.blockPoolID == null) {
-      this.blockPoolID = aliasMap.getBlockPoolId();
+    if (blockPoolID == null) {
+      throw new IOException("Block pool id required to get aliasmap reader");
     }
     // if a block pool id has been supplied, and doesn't match the associated
-    // block pool id, return null.
-    if (blockPoolID != null && this.blockPoolID != null
-        && !this.blockPoolID.equals(blockPoolID)) {
-      return null;
+    // block pool ids, return null.
+    for (InMemoryAliasMapProtocol aliasMap : aliasMaps) {
+      try {
+        String aliasMapBlockPoolId = aliasMap.getBlockPoolId();
+        if (aliasMapBlockPoolId != null &&
+            aliasMapBlockPoolId.equals(blockPoolID)) {
+          return aliasMap;
+        }
+      } catch (IOException e) {
+        LOG.error("Exception in retrieving block pool id {}", e);
+      }
     }
-    return new LevelDbReader();
+    throw new IOException(
+        "Unable to retrive InMemoryAliasMap for block pool id " + blockPoolID);
+  }
+
+  @Override
+  public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolID)
+      throws IOException {
+    InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
+    LOG.info("Loading InMemoryAliasMapReader for block pool id {}",
+        blockPoolID);
+    return new LevelDbReader(aliasMap);
   }
 
   @Override
   public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolID)
       throws IOException {
-    if (this.blockPoolID == null) {
-      this.blockPoolID = aliasMap.getBlockPoolId();
-    }
-    if (blockPoolID != null && !this.blockPoolID.equals(blockPoolID)) {
-      return null;
-    }
-    return new LevelDbWriter();
+    InMemoryAliasMapProtocol aliasMap = getAliasMap(blockPoolID);
+    LOG.info("Loading InMemoryAliasMapWriter for block pool id {}",
+        blockPoolID);
+    return new LevelDbWriter(aliasMap);
   }
 
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-    this.aliasMap = new InMemoryAliasMapProtocolClientSideTranslatorPB(conf);
+    aliasMaps = InMemoryAliasMapProtocolClientSideTranslatorPB.init(conf);
   }
 
   @Override
@@ -174,5 +211,4 @@ public class InMemoryLevelDBAliasMapClient extends BlockAliasMap<FileRegion>
   @Override
   public void refresh() throws IOException {
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 146ae6c..6dd2d92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4817,9 +4817,27 @@
 
   <property>
     <name>dfs.provided.aliasmap.inmemory.dnrpc-address</name>
-    <value>0.0.0.0:50200</value>
+    <value></value>
+    <description>
+      The address where the aliasmap server will be running. In the case of
+      HA/Federation where multiple namenodes exist, and if the Namenode is
+      configured to run the aliasmap server
+      (dfs.provided.aliasmap.inmemory.enabled is set to true),
+      the name service id is added to the name, e.g.,
+      dfs.provided.aliasmap.inmemory.rpc.address.EXAMPLENAMESERVICE.
+      The value of this property will take the form of host:rpc-port.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.aliasmap.inmemory.rpc.bind-host</name>
+    <value></value>
     <description>
-      The address where the aliasmap server will be running
+      The actual address the in-memory aliasmap server will bind to.
+      If this optional address is set, it overrides the hostname portion of
+      dfs.provided.aliasmap.inmemory.rpc.address.
+      This is useful for making the name node listen on all interfaces by
+      setting it to 0.0.0.0.
     </description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index c2e2a68..a2e5951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1187,7 +1187,7 @@ public class MiniDFSCluster implements AutoCloseable {
   }
 
 
-  private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
+  protected void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
       boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
       throws IOException {
     if (nameserviceId != null) {
@@ -1379,6 +1379,17 @@ public class MiniDFSCluster implements AutoCloseable {
     return null;
   }
 
+  public List<Integer> getNNIndexes(String nameserviceId) {
+    int count = 0;
+    List<Integer> nnIndexes = new ArrayList<>();
+    for (NameNodeInfo nn : namenodes.values()) {
+      if (nn.getNameserviceId().equals(nameserviceId)) {
+        nnIndexes.add(count);
+      }
+      count++;
+    }
+    return nnIndexes;
+  }
 
   /**
    * wait for the given namenode to get out of safemode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
index b9786a3..c21ff80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
@@ -227,7 +227,7 @@ public class MiniDFSNNTopology {
       this.nnId = nnId;
     }
 
-    String getNnId() {
+    public String getNnId() {
       return nnId;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
index 61a1558..f062633 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/blockaliasmap/impl/TestInMemoryLevelDBAliasMapClient.java
@@ -32,6 +32,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -341,4 +342,10 @@ public class TestInMemoryLevelDBAliasMapClient {
     assertThat(actualFileRegions).containsExactlyInAnyOrder(
         expectedFileRegions.toArray(new FileRegion[0]));
   }
+
+  @Test
+  public void testServerBindHost() throws Exception {
+    conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, "0.0.0.0");
+    writeRead();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1804a315/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
index 7d3ab0e..e3f4dec 100644
--- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/ITestProvidedImplementation.java
@@ -24,12 +24,18 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
-import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
@@ -44,14 +50,14 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
-import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -60,8 +66,10 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
+import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.LevelDBFileRegionAliasMap;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 
@@ -71,6 +79,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
 import org.junit.After;
 import org.junit.Before;
@@ -80,6 +90,12 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LEVELDB_PATH;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap.fileNameFromBlockPoolID;
 import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
 import static org.junit.Assert.*;
@@ -106,6 +122,7 @@ public class ITestProvidedImplementation {
   private final int baseFileLen = 1024;
   private long providedDataSize = 0;
   private final String bpid = "BP-1234-10.1.1.1-1224";
+  private static final String clusterID = "CID-PROVIDED";
 
   private Configuration conf;
   private MiniDFSCluster cluster;
@@ -214,36 +231,78 @@ public class ITestProvidedImplementation {
       StorageType[] storageTypes,
       StorageType[][] storageTypesPerDatanode,
       boolean doFormat, String[] racks) throws IOException {
+    startCluster(nspath, numDatanodes,
+        storageTypes, storageTypesPerDatanode,
+        doFormat, racks, null,
+        new MiniDFSCluster.Builder(conf));
+  }
+
+  void startCluster(Path nspath, int numDatanodes,
+      StorageType[] storageTypes,
+      StorageType[][] storageTypesPerDatanode,
+      boolean doFormat, String[] racks,
+      MiniDFSNNTopology topo,
+      MiniDFSCluster.Builder builder) throws IOException {
     conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
 
+    builder.format(doFormat)
+        .manageNameDfsDirs(doFormat)
+        .numDataNodes(numDatanodes)
+        .racks(racks);
     if (storageTypesPerDatanode != null) {
-      cluster = new MiniDFSCluster.Builder(conf)
-          .format(doFormat)
-          .manageNameDfsDirs(doFormat)
-          .numDataNodes(numDatanodes)
-          .storageTypes(storageTypesPerDatanode)
-          .racks(racks)
-          .build();
+      builder.storageTypes(storageTypesPerDatanode);
     } else if (storageTypes != null) {
-      cluster = new MiniDFSCluster.Builder(conf)
-          .format(doFormat)
-          .manageNameDfsDirs(doFormat)
-          .numDataNodes(numDatanodes)
-          .storagesPerDatanode(storageTypes.length)
-          .storageTypes(storageTypes)
-          .racks(racks)
-          .build();
-    } else {
-      cluster = new MiniDFSCluster.Builder(conf)
-          .format(doFormat)
-          .manageNameDfsDirs(doFormat)
-          .numDataNodes(numDatanodes)
-          .racks(racks)
-          .build();
+      builder.storagesPerDatanode(storageTypes.length)
+          .storageTypes(storageTypes);
     }
+    if (topo != null) {
+      builder.nnTopology(topo);
+      // If HA or Federation is enabled and formatting is set to false,
+      // copy the FSImage to all Namenode directories.
+      if ((topo.isHA() || topo.isFederated()) && !doFormat) {
+        builder.manageNameDfsDirs(true);
+        builder.enableManagedDfsDirsRedundancy(false);
+        builder.manageNameDfsSharedDirs(false);
+        List<File> nnDirs =
+            getProvidedNamenodeDirs(MiniDFSCluster.getBaseDirectory(), topo);
+        for (File nnDir : nnDirs) {
+          MiniDFSCluster.copyNameDirs(
+              Collections.singletonList(nspath.toUri()),
+              Collections.singletonList(fileAsURI(nnDir)),
+              conf);
+        }
+      }
+    }
+    cluster = builder.build();
     cluster.waitActive();
   }
 
+  private static List<File> getProvidedNamenodeDirs(String baseDir,
+      MiniDFSNNTopology topo) {
+    List<File> nnDirs = new ArrayList<>();
+    int nsCounter = 0;
+    for (MiniDFSNNTopology.NSConf nsConf : topo.getNameservices()) {
+      int nnCounter = nsCounter;
+      for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) {
+        if (providedNameservice.equals(nsConf.getId())) {
+          // only add the first one
+          File[] nnFiles =
+              MiniDFSCluster.getNameNodeDirectory(
+                  baseDir, nsCounter, nnCounter);
+          if (nnFiles == null || nnFiles.length == 0) {
+            throw new RuntimeException("Failed to get a location for the"
+                + "Namenode directory for namespace: " + nsConf.getId()
+                + " and namenodeId: " + nnConf.getNnId());
+          }
+          nnDirs.add(nnFiles[0]);
+        }
+        nnCounter++;
+      }
+      nsCounter = nnCounter;
+    }
+    return nnDirs;
+  }
+
   @Test(timeout=20000)
   public void testLoadImage() throws Exception {
     final long seed = r.nextLong();
@@ -405,8 +464,8 @@ public class ITestProvidedImplementation {
     return ret;
   }
 
-  private void verifyFileSystemContents() throws Exception {
-    FileSystem fs = cluster.getFileSystem();
+  private void verifyFileSystemContents(int nnIndex) throws Exception {
+    FileSystem fs = cluster.getFileSystem(nnIndex);
     int count = 0;
     // read NN metadata, verify contents match
     for (TreePath e : new FSTreeWalk(providedPath, conf)) {
@@ -766,41 +825,255 @@ public class ITestProvidedImplementation {
     }
   }
 
-
-  @Test
-  public void testInMemoryAliasMap() throws Exception {
-    conf.setClass(ImageWriter.Options.UGI_CLASS,
-        FsUGIResolver.class, UGIResolver.class);
+  private File createInMemoryAliasMapImage() throws Exception {
+    conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
+        UGIResolver.class);
     conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
         InMemoryLevelDBAliasMapClient.class, BlockAliasMap.class);
-    conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
-        "localhost:32445");
+    conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS, "localhost:32445");
     File tempDirectory =
-        Files.createTempDirectory("in-memory-alias-map").toFile();
-    File leveDBPath = new File(tempDirectory, bpid);
-    leveDBPath.mkdirs();
-    conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
+        new File(new Path(nnDirPath, "in-memory-alias-map").toUri());
+    File levelDBDir = new File(tempDirectory, bpid);
+    levelDBDir.mkdirs();
+    conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
         tempDirectory.getAbsolutePath());
-    conf.setBoolean(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
     conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
-    InMemoryLevelDBAliasMapServer levelDBAliasMapServer =
-        new InMemoryLevelDBAliasMapServer(InMemoryAliasMap::init, bpid);
-    levelDBAliasMapServer.setConf(conf);
-    levelDBAliasMapServer.start();
+    conf.set(DFS_PROVIDED_ALIASMAP_LEVELDB_PATH,
+        tempDirectory.getAbsolutePath());
 
     createImage(new FSTreeWalk(providedPath, conf),
         nnDirPath,
-        FixedBlockResolver.class, "",
-        InMemoryLevelDBAliasMapClient.class);
-    levelDBAliasMapServer.close();
+        FixedBlockResolver.class, clusterID,
+        LevelDBFileRegionAliasMap.class);
+
+    return tempDirectory;
+  }
 
+  @Test
+  public void testInMemoryAliasMap() throws Exception {
+    File aliasMapImage = createInMemoryAliasMapImage();
     // start cluster with two datanodes,
     // each with 1 PROVIDED volume and other DISK volume
+    conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
+    conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
     startCluster(nnDirPath, 2,
         new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
         null, false);
-    verifyFileSystemContents();
-    FileUtils.deleteDirectory(tempDirectory);
+    verifyFileSystemContents(0);
+    FileUtils.deleteDirectory(aliasMapImage);
+  }
+
+  /**
+   * Find a free port that hasn't been assigned yet.
+   *
+   * @param usedPorts set of ports that have already been assigned.
+   * @param maxTrials maximum number of random ports to try before failure.
+   * @return an unassigned port.
+   */
+  private int getUnAssignedPort(Set<Integer> usedPorts, int maxTrials) {
+    int count = 0;
+    while (count < maxTrials) {
+      int port = NetUtils.getFreeSocketPort();
+      if (usedPorts.contains(port)) {
+        count++;
+      } else {
+        return port;
+      }
+    }
+    return -1;
+  }
+
+  private static String providedNameservice;
+
+  /**
+   * Extends the {@link MiniDFSCluster.Builder} to create instances of
+   * {@link MiniDFSClusterBuilderAliasMap}.
+   */
+  private static class MiniDFSClusterBuilderAliasMap
+      extends MiniDFSCluster.Builder {
+
+    MiniDFSClusterBuilderAliasMap(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public MiniDFSCluster build() throws IOException {
+      return new MiniDFSClusterAliasMap(this);
+    }
+  }
+
+  /**
+   * Extends {@link MiniDFSCluster} to correctly configure the InMemoryAliasMap.
+   */
+  private static class MiniDFSClusterAliasMap extends MiniDFSCluster {
+
+    private Map<String, Collection<URI>> formattedDirsByNamespaceId;
+    private Set<Integer> completedNNs;
+
+    MiniDFSClusterAliasMap(MiniDFSCluster.Builder builder) throws IOException {
+      super(builder);
+    }
+
+    @Override
+    protected void initNameNodeConf(Configuration conf, String nameserviceId,
+        int nsIndex, String nnId, boolean manageNameDfsDirs,
+        boolean enableManagedDfsDirsRedundancy, int nnIndex)
+        throws IOException {
+
+      if (formattedDirsByNamespaceId == null) {
+        formattedDirsByNamespaceId = new HashMap<>();
+        completedNNs = new HashSet<>();
+      }
+
+      super.initNameNodeConf(conf, nameserviceId, nsIndex, nnId,
+          manageNameDfsDirs, enableManagedDfsDirsRedundancy, nnIndex);
+
+      if (providedNameservice.equals(nameserviceId)) {
+        // configure the InMemoryAliasMp.
+        conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
+        String directory = conf.get(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
+        if (directory == null) {
+          throw new IllegalArgumentException("In-memory alias map configured"
+              + "with the proper location; Set "
+              + DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR);
+        }
+        // get the name of the directory (final component in path) used for map.
+        // Assume that the aliasmap configured with the same final component
+        // name in all Namenodes but is located in the path specified by
+        // DFS_NAMENODE_NAME_DIR_KEY
+        String dirName = new Path(directory).getName();
+        String nnDir =
+            conf.getTrimmedStringCollection(DFS_NAMENODE_NAME_DIR_KEY)
+                .iterator().next();
+        conf.set(DFS_PROVIDED_ALIASMAP_INMEMORY_LEVELDB_DIR,
+            new File(new Path(nnDir, dirName).toUri()).getAbsolutePath());
+        conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, true);
+      } else {
+        if (!completedNNs.contains(nnIndex)) {
+          // format the NN directories for non-provided namespaces
+          // if the directory for a namespace has been formatted, copy it over.
+          Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
+          if (formattedDirsByNamespaceId.containsKey(nameserviceId)) {
+            copyNameDirs(formattedDirsByNamespaceId.get(nameserviceId),
+                namespaceDirs, conf);
+          } else {
+            for (URI nameDirUri : namespaceDirs) {
+              File nameDir = new File(nameDirUri);
+              if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
+                throw new IOException("Could not fully delete " + nameDir);
+              }
+            }
+            HdfsServerConstants.StartupOption.FORMAT.setClusterId(clusterID);
+            DFSTestUtil.formatNameNode(conf);
+            formattedDirsByNamespaceId.put(nameserviceId, namespaceDirs);
+          }
+          conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_PROVIDED_ENABLED, false);
+          completedNNs.add(nnIndex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Configures the addresseses of the InMemoryAliasMap.
+   *
+   * @param topology the MiniDFS topology to use.
+   * @param providedNameservice the nameservice id that supports provided.
+   */
+  private void configureAliasMapAddresses(MiniDFSNNTopology topology,
+      String providedNameservice) {
+    conf.unset(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS);
+    Set<Integer> assignedPorts = new HashSet<>();
+    for (MiniDFSNNTopology.NSConf nsConf : topology.getNameservices()) {
+      for (MiniDFSNNTopology.NNConf nnConf : nsConf.getNNs()) {
+        if (providedNameservice.equals(nsConf.getId())) {
+          String key =
+              DFSUtil.addKeySuffixes(DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_ADDRESS,
+                  nsConf.getId(), nnConf.getNnId());
+          int port = getUnAssignedPort(assignedPorts, 10);
+          if (port == -1) {
+            throw new RuntimeException("No free ports available");
+          }
+          assignedPorts.add(port);
+          conf.set(key, "127.0.0.1:" + port);
+
+          String binHostKey =
+              DFSUtil.addKeySuffixes(
+                  DFS_PROVIDED_ALIASMAP_INMEMORY_RPC_BIND_HOST,
+                  nsConf.getId(), nnConf.getNnId());
+          conf.set(binHostKey, "0.0.0.0");
+        }
+      }
+    }
+  }
+
+  /**
+   * Verify the mounted contents of the Filesystem.
+   *
+   * @param topology the topology of the cluster.
+   * @param providedNameservice the namespace id of the provided namenodes.
+   * @throws Exception
+   */
+  private void verifyPathsWithHAFailoverIfNecessary(MiniDFSNNTopology topology,
+      String providedNameservice) throws Exception {
+    List<Integer> nnIndexes = cluster.getNNIndexes(providedNameservice);
+    if (topology.isHA()) {
+      int nn1 = nnIndexes.get(0);
+      int nn2 = nnIndexes.get(1);
+      try {
+        verifyFileSystemContents(nn1);
+        fail("Read operation should fail as no Namenode is active");
+      } catch (RemoteException e) {
+        LOG.info("verifyPaths failed!. Expected exception: {}" + e);
+      }
+      cluster.transitionToActive(nn1);
+      LOG.info("Verifying data from NN with index = {}", nn1);
+      verifyFileSystemContents(nn1);
+      // transition to the second namenode.
+      cluster.transitionToStandby(nn1);
+      cluster.transitionToActive(nn2);
+      LOG.info("Verifying data from NN with index = {}", nn2);
+      verifyFileSystemContents(nn2);
+
+      cluster.shutdownNameNodes();
+      try {
+        verifyFileSystemContents(nn2);
+        fail("Read operation should fail as no Namenode is active");
+      } catch (NullPointerException e) {
+        LOG.info("verifyPaths failed!. Expected exception: {}" + e);
+      }
+    } else {
+      verifyFileSystemContents(nnIndexes.get(0));
+    }
+  }
+
+  @Test
+  public void testInMemoryAliasMapMultiTopologies() throws Exception {
+    MiniDFSNNTopology[] topologies =
+        new MiniDFSNNTopology[] {
+            MiniDFSNNTopology.simpleHATopology(),
+            MiniDFSNNTopology.simpleFederatedTopology(3),
+            MiniDFSNNTopology.simpleHAFederatedTopology(3)
+        };
+
+    for (MiniDFSNNTopology topology : topologies) {
+      LOG.info("Starting test with topology with HA = {}, federation = {}",
+          topology.isHA(), topology.isFederated());
+      setSeed();
+      createInMemoryAliasMapImage();
+      conf.setBoolean(DFS_PROVIDED_ALIASMAP_INMEMORY_ENABLED, true);
+      conf.setInt(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES, 10);
+      providedNameservice = topology.getNameservices().get(0).getId();
+      // configure the AliasMap addresses
+      configureAliasMapAddresses(topology, providedNameservice);
+      startCluster(nnDirPath, 2,
+          new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
+          null, false, null, topology,
+          new MiniDFSClusterBuilderAliasMap(conf));
+
+      verifyPathsWithHAFailoverIfNecessary(topology, providedNameservice);
+      shutdown();
+    }
   }
 
   private DatanodeDescriptor getDatanodeDescriptor(DatanodeManager dnm,
@@ -919,7 +1192,7 @@ public class ITestProvidedImplementation {
       startCluster(nnDirPath, racks.length,
           new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
           null, false, racks);
-      verifyFileSystemContents();
+      verifyFileSystemContents(0);
       setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
       cluster.shutdown();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message