hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject [hadoop] branch trunk updated: HDDS-1586. Allow Ozone RPC client to read with topology awareness. Contributed by Sammi Chen.
Date Tue, 09 Jul 2019 21:45:19 GMT
This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0303072  HDDS-1586. Allow Ozone RPC client to read with topology awareness. Contributed by Sammi Chen.
0303072 is described below

commit 030307226a507ae3a680e37c36279221e14d1a9f
Author: Xiaoyu Yao <xyao@apache.org>
AuthorDate: Tue Jul 9 14:43:55 2019 -0700

    HDDS-1586. Allow Ozone RPC client to read with topology awareness. Contributed by Sammi Chen.
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  37 +++++--
 .../hadoop/hdds/scm/XceiverClientManager.java      |  68 ++++++++++++-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   4 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  28 +++++-
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   4 +
 .../hadoop/hdds/scm/net/NetworkTopology.java       |   4 +-
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |  53 ++++++----
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  79 ++++++++++++++-
 .../scm/protocol/ScmBlockLocationProtocol.java     |   9 ++
 ...lockLocationProtocolClientSideTranslatorPB.java |  34 +++++++
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   4 +-
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   3 +-
 ...lockLocationProtocolServerSideTranslatorPB.java |  26 +++++
 .../src/main/proto/ScmBlockLocationProtocol.proto  |  16 +++
 hadoop-hdds/common/src/main/proto/hdds.proto       |   5 +-
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 .../hdds/scm/net/TestNetworkTopologyImpl.java      |  35 ++++---
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   2 +
 .../hdds/scm/server/SCMBlockProtocolServer.java    |  32 ++++++
 .../hadoop/hdds/scm/server/SCMConfigurator.java    |  19 ++++
 .../hdds/scm/server/StorageContainerManager.java   |   6 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |   4 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  14 ++-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  10 ++
 hadoop-ozone/common/src/main/bin/ozone             |   2 +
 .../{ozone-net-topology => ozone-topology}/.env    |   0
 .../docker-compose.yaml                            |  22 +++--
 .../docker-config                                  |   5 +-
 .../network-config                                 |   0
 .../{ozone-net-topology => ozone-topology}/test.sh |   0
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  83 +++++++++++++++-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  96 +++++++++++++++++-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 107 ++++++++++++++++++++-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   4 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   4 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  36 ++++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  12 ++-
 .../apache/hadoop/ozone/om/fs/OzoneManagerFS.java  |  11 ++-
 .../ozone/om/ScmBlockLocationTestIngClient.java    |   6 ++
 39 files changed, 803 insertions(+), 89 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 8dd3753..3cc6fa1 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@@ -64,7 +65,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * A Client for the storageContainer protocol.
+ * A Client for the storageContainer protocol for read object data.
  */
 public class XceiverClientGrpc extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
@@ -76,6 +77,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   private final Semaphore semaphore;
   private boolean closed = false;
   private SecurityConfig secConfig;
+  private final boolean topologyAwareRead;
 
   /**
    * Constructs a client that can communicate with the Container framework on
@@ -96,6 +98,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     this.metrics = XceiverClientManager.getXceiverClientMetrics();
     this.channels = new HashMap<>();
     this.asyncStubs = new HashMap<>();
+    this.topologyAwareRead = Boolean.parseBoolean(config.get(
+        ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
+        ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
   }
 
   /**
@@ -103,9 +108,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    */
   @Override
   public void connect() throws Exception {
-    // leader by default is the 1st datanode in the datanode list of pipleline
-    DatanodeDetails dn = this.pipeline.getFirstNode();
-    // just make a connection to the 1st datanode at the beginning
+    // connect to the closest node, if closest node doesn't exist, delegate to
+    // first node, which is usually the leader in the pipeline.
+    DatanodeDetails dn = this.pipeline.getClosestNode();
+    // just make a connection to the picked datanode at the beginning
     connectToDatanode(dn, null);
   }
 
@@ -114,9 +120,11 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    */
   @Override
   public void connect(String encodedToken) throws Exception {
-    // leader by default is the 1st datanode in the datanode list of pipleline
-    DatanodeDetails dn = this.pipeline.getFirstNode();
-    // just make a connection to the 1st datanode at the beginning
+    // connect to the closest node, if closest node doesn't exist, delegate to
+    // first node, which is usually the leader in the pipeline.
+    DatanodeDetails dn;
+    dn = this.pipeline.getClosestNode();
+    // just make a connection to the picked datanode at the beginning
     connectToDatanode(dn, encodedToken);
   }
 
@@ -132,7 +140,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 
     // Add credential context to the client call
     String userName = UserGroupInformation.getCurrentUser().getShortUserName();
-    LOG.debug("Connecting to server Port : " + dn.getIpAddress());
+    LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString());
+    LOG.debug("Connecting to server : {}", dn.getIpAddress());
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
@@ -252,7 +261,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     // TODO: cache the correct leader info in here, so that any subsequent calls
     // should first go to leader
     XceiverClientReply reply = new XceiverClientReply(null);
-    for (DatanodeDetails dn : pipeline.getNodes()) {
+    List<DatanodeDetails> datanodeList;
+    if ((request.getCmdType() == ContainerProtos.Type.ReadChunk ||
+        request.getCmdType() == ContainerProtos.Type.GetSmallFile) &&
+        topologyAwareRead) {
+      datanodeList = pipeline.getNodesInOrder();
+    } else {
+      datanodeList = pipeline.getNodes();
+    }
+    for (DatanodeDetails dn : datanodeList) {
       try {
         LOG.debug("Executing command " + request + " on datanode " + dn);
         // In case the command gets retried on a 2nd datanode,
@@ -349,6 +366,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
       reconnect(dn, token);
     }
 
+    LOG.debug("Send command {} to datanode {}", request.getCmdType().toString(),
+        dn.getNetworkFullPath());
     final CompletableFuture<ContainerCommandResponseProto> replyFuture =
         new CompletableFuture<>();
     semaphore.acquire();
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index f9b5e6d..57799aa 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -57,7 +59,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
  * not being used for a period of time.
  */
 public class XceiverClientManager implements Closeable {
-
+  private static final Logger LOG =
+      LoggerFactory.getLogger(XceiverClientManager.class);
   //TODO : change this to SCM configuration class
   private final Configuration conf;
   private final Cache<String, XceiverClientSpi> clientCache;
@@ -65,6 +68,7 @@ public class XceiverClientManager implements Closeable {
 
   private static XceiverClientMetrics metrics;
   private boolean isSecurityEnabled;
+  private final boolean topologyAwareRead;
   /**
    * Creates a new XceiverClientManager.
    *
@@ -98,6 +102,9 @@ public class XceiverClientManager implements Closeable {
               }
             }
           }).build();
+    topologyAwareRead = Boolean.parseBoolean(conf.get(
+        ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
+        ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
   }
 
   @VisibleForTesting
@@ -118,12 +125,32 @@ public class XceiverClientManager implements Closeable {
    */
   public XceiverClientSpi acquireClient(Pipeline pipeline)
       throws IOException {
+    return acquireClient(pipeline, false);
+  }
+
+  /**
+   * Acquires a XceiverClientSpi connected to a container for read.
+   *
+   * If there is already a cached XceiverClientSpi, simply return
+   * the cached otherwise create a new one.
+   *
+   * @param pipeline the container pipeline for the client connection
+   * @return XceiverClientSpi connected to a container
+   * @throws IOException if a XceiverClientSpi cannot be acquired
+   */
+  public XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
+      throws IOException {
+    return acquireClient(pipeline, true);
+  }
+
+  private XceiverClientSpi acquireClient(Pipeline pipeline, boolean read)
+      throws IOException {
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkArgument(pipeline.getNodes() != null);
     Preconditions.checkArgument(!pipeline.getNodes().isEmpty());
 
     synchronized (clientCache) {
-      XceiverClientSpi info = getClient(pipeline);
+      XceiverClientSpi info = getClient(pipeline, read);
       info.incrementReference();
       return info;
     }
@@ -136,12 +163,28 @@ public class XceiverClientManager implements Closeable {
    * @param invalidateClient if true, invalidates the client in cache
    */
   public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
+    releaseClient(client, invalidateClient, false);
+  }
+
+  /**
+   * Releases a read XceiverClientSpi after use.
+   *
+   * @param client client to release
+   * @param invalidateClient if true, invalidates the client in cache
+   */
+  public void releaseClientForReadData(XceiverClientSpi client,
+      boolean invalidateClient) {
+    releaseClient(client, invalidateClient, true);
+  }
+
+  private void releaseClient(XceiverClientSpi client, boolean invalidateClient,
+      boolean read) {
     Preconditions.checkNotNull(client);
     synchronized (clientCache) {
       client.decrementReference();
       if (invalidateClient) {
         Pipeline pipeline = client.getPipeline();
-        String key = pipeline.getId().getId().toString() + pipeline.getType();
+        String key = getPipelineCacheKey(pipeline, read);
         XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
         if (cachedClient == client) {
           clientCache.invalidate(key);
@@ -150,11 +193,13 @@ public class XceiverClientManager implements Closeable {
     }
   }
 
-  private XceiverClientSpi getClient(Pipeline pipeline)
+  private XceiverClientSpi getClient(Pipeline pipeline, boolean forRead)
       throws IOException {
     HddsProtos.ReplicationType type = pipeline.getType();
     try {
-      String key = pipeline.getId().getId().toString() + type;
+      // create different client for read different pipeline node based on
+      // network topology
+      String key = getPipelineCacheKey(pipeline, forRead);
       // Append user short name to key to prevent a different user
       // from using same instance of xceiverClient.
       key = isSecurityEnabled ?
@@ -184,6 +229,19 @@ public class XceiverClientManager implements Closeable {
     }
   }
 
+  private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) {
+    String key = pipeline.getId().getId().toString() + pipeline.getType();
+    if (topologyAwareRead && forRead) {
+      try {
+        key += pipeline.getClosestNode().getHostName();
+      } catch (IOException e) {
+        LOG.error("Failed to get closest node to create pipeline cache key:" +
+            e.getMessage());
+      }
+    }
+    return key;
+  }
+
   /**
    * Close and remove all the cached clients.
    */
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index e66db5f..35807f4 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -151,7 +151,7 @@ public class BlockInputStream extends InputStream implements Seekable {
       pipeline = Pipeline.newBuilder(pipeline)
           .setType(HddsProtos.ReplicationType.STAND_ALONE).build();
     }
-    xceiverClient = xceiverClientManager.acquireClient(pipeline);
+    xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
     boolean success = false;
     List<ChunkInfo> chunks;
     try {
@@ -170,7 +170,7 @@ public class BlockInputStream extends InputStream implements Seekable {
       success = true;
     } finally {
       if (!success) {
-        xceiverClientManager.releaseClient(xceiverClient, false);
+        xceiverClientManager.releaseClientForReadData(xceiverClient, false);
       }
     }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 34de028..31e4df0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -76,6 +76,7 @@ public class DatanodeDetails extends NodeImpl implements
     this.ipAddress = datanodeDetails.ipAddress;
     this.hostName = datanodeDetails.hostName;
     this.ports = datanodeDetails.ports;
+    this.setNetworkName(datanodeDetails.getNetworkName());
   }
 
   /**
@@ -192,6 +193,12 @@ public class DatanodeDetails extends NodeImpl implements
       builder.addPort(newPort(
           Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
     }
+    if (datanodeDetailsProto.hasNetworkLocation()) {
+      builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
+    }
+    if (datanodeDetailsProto.hasNetworkName()) {
+      builder.setNetworkName(datanodeDetailsProto.getNetworkName());
+    }
     return builder.build();
   }
 
@@ -213,6 +220,7 @@ public class DatanodeDetails extends NodeImpl implements
       builder.setCertSerialId(certSerialId);
     }
     builder.setNetworkLocation(getNetworkLocation());
+    builder.setNetworkName(getNetworkName());
 
     for (Port port : ports) {
       builder.addPorts(HddsProtos.Port.newBuilder()
@@ -268,6 +276,7 @@ public class DatanodeDetails extends NodeImpl implements
     private String id;
     private String ipAddress;
     private String hostName;
+    private String networkName;
     private String networkLocation;
     private List<Port> ports;
     private String certSerialId;
@@ -314,6 +323,17 @@ public class DatanodeDetails extends NodeImpl implements
     }
 
     /**
+     * Sets the network name of DataNode.
+     *
+     * @param name network name
+     * @return DatanodeDetails.Builder
+     */
+    public Builder setNetworkName(String name) {
+      this.networkName = name;
+      return this;
+    }
+
+    /**
      * Sets the network location of DataNode.
      *
      * @param loc location
@@ -358,8 +378,12 @@ public class DatanodeDetails extends NodeImpl implements
       if (networkLocation == null) {
         networkLocation = NetConstants.DEFAULT_RACK;
       }
-      return new DatanodeDetails(id, ipAddress, hostName, networkLocation,
-          ports, certSerialId);
+      DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName,
+          networkLocation, ports, certSerialId);
+      if (networkName != null) {
+        dn.setNetworkName(networkName);
+      }
+      return dn;
     }
   }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1213dee..4b3b89d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -368,6 +368,10 @@ public final class ScmConfigKeys {
       "ozone.scm.network.topology.schema.file";
   public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT =
       "network-topology-default.xml";
+  public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED =
+      "dfs.network.topology.aware.read.enable";
+  public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT =
+      "true";
 
   public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
   public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
index a3d3680..8d8571d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.net;
 
 import java.util.Collection;
+import java.util.List;
 
 /**
  * The interface defines a network topology.
@@ -246,5 +247,6 @@ public interface NetworkTopology {
    * @param nodes     Available replicas with the requested data
    * @param activeLen Number of active nodes at the front of the array
    */
-  void sortByDistanceCost(Node reader, Node[] nodes, int activeLen);
+  List<? extends Node> sortByDistanceCost(Node reader,
+      List<? extends Node> nodes, int activeLen);
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 8613ed7..1a03215 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -545,10 +546,6 @@ public class NetworkTopologyImpl implements NetworkTopology{
           ancestorGen);
       return null;
     }
-    LOG.debug("Choosing random from \"{}\" available nodes on node \"{}\"," +
-            " scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".",
-        availableNodes, scopeNode, scopeNode.getNetworkFullPath(),
-        excludedScope, excludedNodes);
 
     // scope is a Leaf node
     if (!(scopeNode instanceof InnerNode)) {
@@ -556,15 +553,21 @@ public class NetworkTopologyImpl implements NetworkTopology{
     }
 
     Node ret;
+    int nodeIndex;
     if (leafIndex >= 0) {
-      ret = ((InnerNode)scopeNode).getLeaf(leafIndex % availableNodes,
-          excludedScope, mutableExNodes, ancestorGen);
+      nodeIndex = leafIndex % availableNodes;
+      ret = ((InnerNode)scopeNode).getLeaf(nodeIndex, excludedScope,
+          mutableExNodes, ancestorGen);
     } else {
-      final int index = ThreadLocalRandom.current().nextInt(availableNodes);
-      ret = ((InnerNode)scopeNode).getLeaf(index, excludedScope, mutableExNodes,
-          ancestorGen);
+      nodeIndex = ThreadLocalRandom.current().nextInt(availableNodes);
+      ret = ((InnerNode)scopeNode).getLeaf(nodeIndex, excludedScope,
+          mutableExNodes, ancestorGen);
     }
-    LOG.debug("chooseRandom return {}", ret);
+    LOG.debug("Choosing node[index={},random={}] from \"{}\" available nodes" +
+            " scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".",
+        nodeIndex, (leafIndex == -1 ? "true" : "false"), availableNodes,
+        scopeNode.getNetworkFullPath(), excludedScope, excludedNodes);
+    LOG.debug("Chosen node = {}", (ret == null ? "not found" : ret.toString()));
     return ret;
   }
 
@@ -583,13 +586,16 @@ public class NetworkTopologyImpl implements NetworkTopology{
         (node1 == null && node2 == null))  {
       return 0;
     }
+    if (node1 == null || node2 == null) {
+      LOG.warn("One of the nodes is a null pointer");
+      return Integer.MAX_VALUE;
+    }
     int cost = 0;
     netlock.readLock().lock();
     try {
-      if (node1 == null || node2 == null ||
-          (node1.getAncestor(maxLevel - 1) != clusterTree) ||
+      if ((node1.getAncestor(maxLevel - 1) != clusterTree) ||
           (node2.getAncestor(maxLevel - 1) != clusterTree)) {
-        LOG.warn("One of the nodes is a null pointer");
+        LOG.warn("One of the nodes is outside of network topology");
         return Integer.MAX_VALUE;
       }
       int level1 = node1.getLevel();
@@ -630,17 +636,21 @@ public class NetworkTopologyImpl implements NetworkTopology{
    * @param nodes     Available replicas with the requested data
    * @param activeLen Number of active nodes at the front of the array
    */
-  public void sortByDistanceCost(Node reader, Node[] nodes, int activeLen) {
+  public List<? extends Node> sortByDistanceCost(Node reader,
+      List<? extends Node> nodes, int activeLen) {
     /** Sort weights for the nodes array */
+    if (reader == null) {
+      return nodes;
+    }
     int[] costs = new int[activeLen];
     for (int i = 0; i < activeLen; i++) {
-      costs[i] = getDistanceCost(reader, nodes[i]);
+      costs[i] = getDistanceCost(reader, nodes.get(i));
     }
     // Add cost/node pairs to a TreeMap to sort
     TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
     for (int i = 0; i < activeLen; i++) {
       int cost = costs[i];
-      Node node = nodes[i];
+      Node node = nodes.get(i);
       List<Node> list = tree.get(cost);
       if (list == null) {
         list = Lists.newArrayListWithExpectedSize(1);
@@ -648,17 +658,20 @@ public class NetworkTopologyImpl implements NetworkTopology{
       }
       list.add(node);
     }
-    int idx = 0;
+
+    List<Node> ret = new ArrayList<>();
     for (List<Node> list: tree.values()) {
       if (list != null) {
         Collections.shuffle(list);
         for (Node n: list) {
-          nodes[idx] = n;
-          idx++;
+          ret.add(n);
         }
       }
     }
-    Preconditions.checkState(idx == activeLen, "Wrong number of nodes sorted!");
+
+    Preconditions.checkState(ret.size() == activeLen,
+        "Wrong number of nodes sorted!");
+    return ret;
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 7f75dd1..0a91f6b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -25,9 +25,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,12 +41,16 @@ import java.util.stream.Collectors;
  */
 public final class Pipeline {
 
+  private static final Logger LOG = LoggerFactory
+      .getLogger(Pipeline.class);
   private final PipelineID id;
   private final ReplicationType type;
   private final ReplicationFactor factor;
 
   private PipelineState state;
   private Map<DatanodeDetails, Long> nodeStatus;
+  // nodes with ordered distance to client
+  private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
 
   /**
    * The immutable properties of pipeline object is used in
@@ -112,6 +119,14 @@ public final class Pipeline {
     return nodeStatus.keySet().iterator().next();
   }
 
+  public DatanodeDetails getClosestNode() throws IOException {
+    if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) {
+      LOG.debug("Nodes in order is empty, delegate to getFirstNode");
+      return getFirstNode();
+    }
+    return nodesInOrder.get().get(0);
+  }
+
   public boolean isClosed() {
     return state == PipelineState.CLOSED;
   }
@@ -120,6 +135,18 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
+  public void setNodesInOrder(List<DatanodeDetails> nodes) {
+    nodesInOrder.set(nodes);
+  }
+
+  public List<DatanodeDetails> getNodesInOrder() {
+    if (nodesInOrder.get() == null || nodesInOrder.get().isEmpty()) {
+      LOG.debug("Nodes in order is empty, delegate to getNodes");
+      return getNodes();
+    }
+    return nodesInOrder.get();
+  }
+
   void reportDatanode(DatanodeDetails dn) throws IOException {
     if (nodeStatus.get(dn) == null) {
       throw new IOException(
@@ -152,6 +179,22 @@ public final class Pipeline {
         .addAllMembers(nodeStatus.keySet().stream()
             .map(DatanodeDetails::getProtoBufMessage)
             .collect(Collectors.toList()));
+    // To save the message size on wire, only transfer the node order based on
+    // network topology
+    List<DatanodeDetails> nodes = nodesInOrder.get();
+    if (nodes != null && !nodes.isEmpty()) {
+      for (int i = 0; i < nodes.size(); i++) {
+        Iterator<DatanodeDetails> it = nodeStatus.keySet().iterator();
+        for (int j = 0; j < nodeStatus.keySet().size(); j++) {
+          if (it.next().equals(nodes.get(i))) {
+            builder.addMemberOrders(j);
+            break;
+          }
+        }
+      }
+      LOG.info("Serialize pipeline {} with nodesInOrder{ }", id.toString(),
+          nodes);
+    }
     return builder.build();
   }
 
@@ -164,10 +207,10 @@ public final class Pipeline {
         .setState(PipelineState.fromProtobuf(pipeline.getState()))
         .setNodes(pipeline.getMembersList().stream()
             .map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
+        .setNodesInOrder(pipeline.getMemberOrdersList())
         .build();
   }
 
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -228,6 +271,8 @@ public final class Pipeline {
     private ReplicationFactor factor = null;
     private PipelineState state = null;
     private Map<DatanodeDetails, Long> nodeStatus = null;
+    private List<Integer> nodeOrder = null;
+    private List<DatanodeDetails> nodesInOrder = null;
 
     public Builder() {}
 
@@ -237,6 +282,7 @@ public final class Pipeline {
       this.factor = pipeline.factor;
       this.state = pipeline.state;
       this.nodeStatus = pipeline.nodeStatus;
+      this.nodesInOrder = pipeline.nodesInOrder.get();
     }
 
     public Builder setId(PipelineID id1) {
@@ -265,13 +311,42 @@ public final class Pipeline {
       return this;
     }
 
+    public Builder setNodesInOrder(List<Integer> orders) {
+      this.nodeOrder = orders;
+      return this;
+    }
+
     public Pipeline build() {
       Preconditions.checkNotNull(id);
       Preconditions.checkNotNull(type);
       Preconditions.checkNotNull(factor);
       Preconditions.checkNotNull(state);
       Preconditions.checkNotNull(nodeStatus);
-      return new Pipeline(id, type, factor, state, nodeStatus);
+      Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
+
+      if (nodeOrder != null && !nodeOrder.isEmpty()) {
+        // This branch is for build from ProtoBuf
+        List<DatanodeDetails> nodesWithOrder = new ArrayList<>();
+        for(int i = 0; i < nodeOrder.size(); i++) {
+          int nodeIndex = nodeOrder.get(i);
+          Iterator<DatanodeDetails> it = nodeStatus.keySet().iterator();
+          while(it.hasNext() && nodeIndex >= 0) {
+            DatanodeDetails node = it.next();
+            if (nodeIndex == 0) {
+              nodesWithOrder.add(node);
+              break;
+            }
+            nodeIndex--;
+          }
+        }
+        LOG.info("Deserialize nodesInOrder {} in pipeline {}", nodesWithOrder,
+            id.toString());
+        pipeline.setNodesInOrder(nodesWithOrder);
+      } else if (nodesInOrder != null){
+        // This branch is for pipeline clone
+        pipeline.setNodesInOrder(nodesInOrder);
+      }
+      return pipeline;
     }
   }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index 12b5912..18045f8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.protocol;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.security.KerberosInfo;
@@ -74,4 +75,12 @@ public interface ScmBlockLocationProtocol extends Closeable {
    * Gets the Clusterid and SCM Id from SCM.
    */
   ScmInfo getScmInfo() throws IOException;
+
+  /**
+   * Sort datanodes with distance to client.
+   * @param nodes list of network name of each node.
+   * @param clientMachine client address, depends, can be hostname or ipaddress.
+   */
+  List<DatanodeDetails> sortDatanodes(List<String> nodes,
+      String clientMachine) throws IOException;
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index af53ea1..a262bb5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
@@ -34,6 +35,10 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Allo
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .SortDatanodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .SortDatanodesResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -227,6 +232,35 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
     return builder.build();
   }
 
+  /**
+   * Sort the datanodes based on distance from client.
+   * @return List<DatanodeDetails></>
+   * @throws IOException
+   */
+  @Override
+  public List<DatanodeDetails> sortDatanodes(List<String> nodes,
+      String clientMachine) throws IOException {
+    SortDatanodesRequestProto request = SortDatanodesRequestProto
+        .newBuilder()
+        .addAllNodeNetworkName(nodes)
+        .setClient(clientMachine)
+        .build();
+    SCMBlockLocationRequest wrapper = createSCMBlockRequest(
+        Type.SortDatanodes)
+        .setSortDatanodesRequest(request)
+        .build();
+
+    final SCMBlockLocationResponse wrappedResponse =
+        handleError(submitRequest(wrapper));
+    SortDatanodesResponseProto resp =
+        wrappedResponse.getSortDatanodesResponse();
+    List<DatanodeDetails> results = new ArrayList<>(resp.getNodeCount());
+    results.addAll(resp.getNodeList().stream()
+        .map(node -> DatanodeDetails.getFromProtoBuf(node))
+        .collect(Collectors.toList()));
+    return results;
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index c29f395..d0ba60d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -230,7 +230,7 @@ public final class ContainerProtocolCalls  {
         ReadChunkRequestProto.newBuilder()
             .setBlockID(blockID.getDatanodeBlockIDProtobuf())
             .setChunkData(chunk);
-    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
     ContainerCommandRequestProto.Builder builder =
         ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
             .setContainerID(blockID.getContainerID())
@@ -494,7 +494,7 @@ public final class ContainerProtocolCalls  {
         GetSmallFileRequestProto
             .newBuilder().setBlock(getBlock)
             .build();
-    String id = client.getPipeline().getFirstNode().getUuidString();
+    String id = client.getPipeline().getClosestNode().getUuidString();
 
     ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
         .newBuilder()
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index f662955..2dc8df7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -35,7 +35,8 @@ public enum SCMAction implements AuditAction {
   CLOSE_PIPELINE,
   DELETE_CONTAINER,
   IN_SAFE_MODE,
-  FORCE_EXIT_SAFE_MODE;
+  FORCE_EXIT_SAFE_MODE,
+  SORT_DATANODE;
 
   @Override
   public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 935d240..5c3648e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -23,6 +23,7 @@ import io.opentracing.Scope;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .AllocateBlockResponse;
 import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -50,6 +51,10 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .SCMBlockLocationRequest;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .Status;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .SortDatanodesRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .SortDatanodesResponseProto;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@@ -114,6 +119,10 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
         response.setGetScmInfoResponse(
             getScmInfo(request.getGetScmInfoRequest()));
         break;
+      case SortDatanodes:
+        response.setSortDatanodesResponse(
+            sortDatanodes(request.getSortDatanodesRequest()));
+        break;
       default:
         // Should never happen
         throw new IOException("Unknown Operation "+request.getCmdType()+
@@ -193,4 +202,21 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
         .setScmId(scmInfo.getScmId())
         .build();
   }
+
+  public SortDatanodesResponseProto sortDatanodes(
+      SortDatanodesRequestProto request) throws ServiceException {
+    SortDatanodesResponseProto.Builder resp =
+        SortDatanodesResponseProto.newBuilder();
+    try {
+      List<String> nodeList = request.getNodeNetworkNameList();
+      final List<DatanodeDetails> results =
+          impl.sortDatanodes(nodeList, request.getClient());
+      if (results != null && results.size() > 0) {
+        results.stream().forEach(dn -> resp.addNode(dn.getProtoBufMessage()));
+      }
+      return resp.build();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+  }
 }
diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
index 81144ab..ded0d02 100644
--- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
@@ -37,6 +37,7 @@ enum Type {
   AllocateScmBlock   = 11;
   DeleteScmKeyBlocks = 12;
   GetScmInfo         = 13;
+  SortDatanodes      = 14;
 }
 
 message SCMBlockLocationRequest {
@@ -51,6 +52,7 @@ message SCMBlockLocationRequest {
   optional AllocateScmBlockRequestProto       allocateScmBlockRequest   = 11;
   optional DeleteScmKeyBlocksRequestProto     deleteScmKeyBlocksRequest = 12;
   optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest         = 13;
+  optional SortDatanodesRequestProto          sortDatanodesRequest      = 14;
 }
 
 message SCMBlockLocationResponse {
@@ -71,6 +73,7 @@ message SCMBlockLocationResponse {
   optional AllocateScmBlockResponseProto       allocateScmBlockResponse   = 11;
   optional DeleteScmKeyBlocksResponseProto     deleteScmKeyBlocksResponse = 12;
   optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse         = 13;
+  optional SortDatanodesResponseProto          sortDatanodesResponse      = 14;
 }
 
 /**
@@ -186,6 +189,19 @@ message AllocateScmBlockResponseProto {
 }
 
 /**
+ * Datanode sort request sent by OM to SCM, it contains
+ * multiple number of datanodes.
+ */
+message SortDatanodesRequestProto{
+  required string client = 1;
+  repeated string nodeNetworkName = 2;
+}
+
+message SortDatanodesResponseProto{
+  repeated DatanodeDetailsProto node = 1;
+}
+
+/**
  * Protocol used from OzoneManager to StorageContainerManager.
  * See request and response messages for details of the RPC calls.
  */
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index 2d5cb03..6475f4c 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -34,7 +34,9 @@ message DatanodeDetailsProto {
     required string hostName = 3;      // hostname
     repeated Port ports = 4;
     optional string certSerialId = 5;   // Certificate serial id.
-    optional string networkLocation = 6; // Network topology location
+    // network name, can be Ip address or host name, depends
+    optional string networkName = 6;
+    optional string networkLocation = 7; // Network topology location
 }
 
 /**
@@ -71,6 +73,7 @@ message Pipeline {
     optional ReplicationType type = 4 [default = STAND_ALONE];
     optional ReplicationFactor factor = 5 [default = ONE];
     required PipelineID id = 6;
+    repeated uint32 memberOrders = 7;
 }
 
 message KeyValue {
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 27b02e6..7fd4ad0 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2346,6 +2346,14 @@
     </description>
   </property>
   <property>
+    <name>dfs.network.topology.aware.read.enable</name>
+    <value>true</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Whether to enable topology aware read to improve the read performance.
+    </description>
+  </property>
+  <property>
     <name>ozone.recon.container.db.impl</name>
     <value>RocksDB</value>
     <tag>OZONE, RECON, STORAGE</tag>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
index 0edfb07..e0041a4 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
@@ -784,15 +784,16 @@ public class TestNetworkTopologyImpl {
       for (Node[] nodeList : nodes) {
         int length = nodeList.length;
         while (length > 0) {
-          cluster.sortByDistanceCost(reader, nodeList, length);
-          for (int i = 0; i < nodeList.length; i++) {
-            if ((i + 1) < nodeList.length) {
-              int cost1 = cluster.getDistanceCost(reader, nodeList[i]);
-              int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]);
+          List<? extends Node> ret = cluster.sortByDistanceCost(reader,
+              Arrays.asList(nodeList), length);
+          for (int i = 0; i < ret.size(); i++) {
+            if ((i + 1) < ret.size()) {
+              int cost1 = cluster.getDistanceCost(reader, ret.get(i));
+              int cost2 = cluster.getDistanceCost(reader, ret.get(i + 1));
               assertTrue("reader:" + (reader != null ?
                   reader.getNetworkFullPath() : "null") +
-                  ",node1:" + nodeList[i].getNetworkFullPath() +
-                  ",node2:" + nodeList[i + 1].getNetworkFullPath() +
+                  ",node1:" + ret.get(i).getNetworkFullPath() +
+                  ",node2:" + ret.get(i + 1).getNetworkFullPath() +
                   ",cost1:" + cost1 + ",cost2:" + cost2,
                   cost1 == Integer.MAX_VALUE || cost1 <= cost2);
             }
@@ -803,20 +804,22 @@ public class TestNetworkTopologyImpl {
     }
 
     // sort all nodes
-    Node[] nodeList = dataNodes.clone();
+    List<Node> nodeList = Arrays.asList(dataNodes.clone());
     for (Node reader : readers) {
-      int length = nodeList.length;
+      int length = nodeList.size();
       while (length >= 0) {
-        cluster.sortByDistanceCost(reader, nodeList, length);
-        for (int i = 0; i < nodeList.length; i++) {
-          if ((i + 1) < nodeList.length) {
-            int cost1 = cluster.getDistanceCost(reader, nodeList[i]);
-            int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]);
+        List<? extends Node> sortedNodeList =
+            cluster.sortByDistanceCost(reader, nodeList, length);
+        for (int i = 0; i < sortedNodeList.size(); i++) {
+          if ((i + 1) < sortedNodeList.size()) {
+            int cost1 = cluster.getDistanceCost(reader, sortedNodeList.get(i));
+            int cost2 = cluster.getDistanceCost(
+                reader, sortedNodeList.get(i + 1));
             // node can be removed when called in testConcurrentAccess
             assertTrue("reader:" + (reader != null ?
                 reader.getNetworkFullPath() : "null") +
-                ",node1:" + nodeList[i].getNetworkFullPath() +
-                ",node2:" + nodeList[i + 1].getNetworkFullPath() +
+                ",node1:" + sortedNodeList.get(i).getNetworkFullPath() +
+                ",node2:" + sortedNodeList.get(i + 1).getNetworkFullPath() +
                 ",cost1:" + cost1 + ",cost2:" + cost2,
                 cost1 == Integer.MAX_VALUE || cost1 <= cost2);
           }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 93eba61..6beb0a3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -566,6 +566,8 @@ public class SCMNodeManager implements NodeManager {
       node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR +
           address);
     }
+    LOG.debug("Get node for {} return {}", address, (node == null ?
+        "not found" : node.getNetworkFullPath()));
     return node == null ? null : (DatanodeDetails)node;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index a14d003..2eb9d47 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
@@ -33,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.io.IOUtils;
@@ -280,6 +283,35 @@ public class SCMBlockProtocolServer implements
   }
 
   @Override
+  public List<DatanodeDetails> sortDatanodes(List<String> nodes,
+      String clientMachine) throws IOException {
+    boolean auditSuccess = true;
+    try{
+      NodeManager nodeManager = scm.getScmNodeManager();
+      Node client = nodeManager.getNode(clientMachine);
+      List<Node> nodeList = new ArrayList();
+      nodes.stream().forEach(path -> nodeList.add(nodeManager.getNode(path)));
+      List<? extends Node> sortedNodeList = scm.getClusterMap()
+          .sortByDistanceCost(client, nodeList, nodes.size());
+      List<DatanodeDetails> ret = new ArrayList<>();
+      sortedNodeList.stream().forEach(node -> ret.add((DatanodeDetails)node));
+      return ret;
+    } catch (Exception ex) {
+      auditSuccess = false;
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(SCMAction.SORT_DATANODE, null, ex)
+      );
+      throw ex;
+    } finally {
+      if(auditSuccess) {
+        AUDIT.logReadSuccess(
+            buildAuditMessageForSuccess(SCMAction.SORT_DATANODE, null)
+        );
+      }
+    }
+  }
+
+  @Override
   public AuditMessage buildAuditMessageForSuccess(
       AuditAction op, Map<String, String> auditMap) {
     return new AuditMessage.Builder()
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index b45ce6f..9bbabd1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.server;
 
 
 import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@@ -65,6 +66,7 @@ public final class SCMConfigurator {
   private SCMSafeModeManager scmSafeModeManager;
   private CertificateServer certificateServer;
   private SCMMetadataStore metadataStore;
+  private NetworkTopology networkTopology;
 
   /**
    * Allows user to specify a version of Node manager to use with this SCM.
@@ -138,6 +140,15 @@ public final class SCMConfigurator {
   }
 
   /**
+   * Allows user to specify a custom version of Network Topology Cluster
+   * to  be used with this SCM.
+   * @param networkTopology - network topology cluster.
+   */
+  public void setNetworkTopology(NetworkTopology networkTopology) {
+    this.networkTopology = networkTopology;
+  }
+
+  /**
    * Gets SCM Node Manager.
    * @return Node Manager.
    */
@@ -200,4 +211,12 @@ public final class SCMConfigurator {
   public SCMMetadataStore getMetadataStore() {
     return metadataStore;
   }
+
+  /**
+   * Get network topology cluster tree.
+   * @return NetworkTopology.
+   */
+  public NetworkTopology getNetworkTopology() {
+    return networkTopology;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4d02ee4..6296df8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -374,7 +374,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private void initializeSystemManagers(OzoneConfiguration conf,
                                        SCMConfigurator configurator)
       throws IOException {
-    clusterMap = new NetworkTopologyImpl(conf);
+    if (configurator.getNetworkTopology() != null) {
+      clusterMap = configurator.getNetworkTopology();
+    } else {
+      clusterMap = new NetworkTopologyImpl(conf);
+    }
 
     if(configurator.getScmNodeManager() != null) {
       scmNodeManager = configurator.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 0b7437e..e2ce7de 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -124,8 +124,8 @@ public final class TestUtils {
         + "." + random.nextInt(256)
         + "." + random.nextInt(256)
         + "." + random.nextInt(256);
-    return createDatanodeDetails(uuid.toString(), "localhost", ipAddress,
-        null);
+    return createDatanodeDetails(uuid.toString(), "localhost" + "-" + ipAddress,
+        ipAddress, null);
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 19fb3a7..64eba29 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hdds.scm.container;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.net.NetConstants;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -83,6 +86,7 @@ public class MockNodeManager implements NodeManager {
   private final Map<UUID, List<SCMCommand>> commandMap;
   private final Node2PipelineMap node2PipelineMap;
   private final Node2ContainerMap node2ContainerMap;
+  private NetworkTopology clusterMap;
 
   public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
     this.healthyNodes = new LinkedList<>();
@@ -366,6 +370,9 @@ public class MockNodeManager implements NodeManager {
     try {
       node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
           Collections.emptySet());
+      if (clusterMap != null) {
+        clusterMap.add(datanodeDetails);
+      }
     } catch (SCMException e) {
       e.printStackTrace();
     }
@@ -453,7 +460,12 @@ public class MockNodeManager implements NodeManager {
 
   @Override
   public DatanodeDetails getNode(String address) {
-    return null;
+    Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + address);
+    return node == null ? null : (DatanodeDetails)node;
+  }
+
+  public void setNetworkTopology(NetworkTopology topology) {
+    this.clusterMap = topology;
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index ae81071..4657fa0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -51,6 +51,7 @@ import org.junit.rules.ExpectedException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -1147,6 +1148,15 @@ public class TestSCMNodeManager {
       List<DatanodeDetails> nodeList = nodeManager.getAllNodes();
       nodeList.stream().forEach(node ->
           Assert.assertTrue(node.getNetworkLocation().equals("/rack1")));
+
+      // test get node
+      if (useHostname) {
+        Arrays.stream(hostNames).forEach(hostname ->
+            Assert.assertNotNull(nodeManager.getNode(hostname)));
+      } else {
+        Arrays.stream(ipAddress).forEach(ip ->
+            Assert.assertNotNull(nodeManager.getNode(ip)));
+      }
     }
   }
 }
diff --git a/hadoop-ozone/common/src/main/bin/ozone b/hadoop-ozone/common/src/main/bin/ozone
index f6fe147..838651c 100755
--- a/hadoop-ozone/common/src/main/bin/ozone
+++ b/hadoop-ozone/common/src/main/bin/ozone
@@ -136,6 +136,7 @@ function ozonecmd_case
     ;;
     sh | shell)
       HADOOP_CLASSNAME=org.apache.hadoop.ozone.web.ozShell.OzoneShell
+      HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_OM_SH_OPTS}"
       OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-ozone-manager"
     ;;
     s3)
@@ -171,6 +172,7 @@ function ozonecmd_case
     ;;
     scmcli)
       HADOOP_CLASSNAME=org.apache.hadoop.hdds.scm.cli.SCMCLI
+      HADOOP_OPTS="${HADOOP_OPTS} ${HDFS_SCM_CLI_OPTS}"
       OZONE_RUN_ARTIFACT_NAME="hadoop-hdds-tools"
     ;;
     version)
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/.env b/hadoop-ozone/dist/src/main/compose/ozone-topology/.env
similarity index 100%
rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/.env
rename to hadoop-ozone/dist/src/main/compose/ozone-topology/.env
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-compose.yaml
similarity index 90%
rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-compose.yaml
rename to hadoop-ozone/dist/src/main/compose/ozone-topology/docker-compose.yaml
index 4f7d5b2..b14f398 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-compose.yaml
@@ -19,6 +19,7 @@ services:
    datanode_1:
       image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
       privileged: true #required by the profiler
+      container_name: datanode_1
       volumes:
         - ../..:/opt/hadoop
       ports:
@@ -28,11 +29,12 @@ services:
       env_file:
         - ./docker-config
       networks:
-         service_network:
+         net:
             ipv4_address: 10.5.0.4
    datanode_2:
       image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
       privileged: true #required by the profiler
+      container_name: datanode_2
       volumes:
         - ../..:/opt/hadoop
       ports:
@@ -42,11 +44,12 @@ services:
       env_file:
         - ./docker-config
       networks:
-         service_network:
+         net:
             ipv4_address: 10.5.0.5
    datanode_3:
       image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
       privileged: true #required by the profiler
+      container_name: datanode_3
       volumes:
         - ../..:/opt/hadoop
       ports:
@@ -56,11 +59,12 @@ services:
       env_file:
         - ./docker-config
       networks:
-         service_network:
+         net:
             ipv4_address: 10.5.0.6
    datanode_4:
       image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
       privileged: true #required by the profiler
+      container_name: datanode_4
       volumes:
         - ../..:/opt/hadoop
       ports:
@@ -70,11 +74,12 @@ services:
       env_file:
         - ./docker-config
       networks:
-         service_network:
+         net:
             ipv4_address: 10.5.0.7
    om:
       image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
       privileged: true #required by the profiler
+      container_name: om
       volumes:
          - ../..:/opt/hadoop
       ports:
@@ -85,11 +90,12 @@ services:
           - ./docker-config
       command: ["/opt/hadoop/bin/ozone","om"]
       networks:
-         service_network:
+         net:
             ipv4_address: 10.5.0.70
    scm:
-      image: apache/ozone-runner::${HADOOP_RUNNER_VERSION}
+      image: apache/ozone-runner:${HADOOP_RUNNER_VERSION}
       privileged: true #required by the profiler
+      container_name: scm
       volumes:
          - ../..:/opt/hadoop
       ports:
@@ -100,10 +106,10 @@ services:
           ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
       command: ["/opt/hadoop/bin/ozone","scm"]
       networks:
-         service_network:
+         net:
             ipv4_address: 10.5.0.71
 networks:
-   service_network:
+   net:
      driver: bridge
      ipam:
        config:
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config
similarity index 96%
rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-config
rename to hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config
index ea98240..f5f8f3f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-topology/docker-config
@@ -29,7 +29,8 @@ OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
 HDFS-SITE.XML_net.topology.node.switch.mapping.impl=org.apache.hadoop.net.TableMapping
-HDFS-SITE.XML_net.topology.table.file.name=/opt/hadoop/compose/ozone-net-topology/network-config
+HDFS-SITE.XML_net.topology.table.file.name=/opt/hadoop/compose/ozone-topology/network-config
+HDFS-SITE.XML_dfs.network.topology.aware.read.enable=true
 ASYNC_PROFILER_HOME=/opt/profiler
 LOG4J.PROPERTIES_log4j.rootLogger=DEBUG, ARF
 LOG4J.PROPERTIES_log4j.appender.stdout=org.apache.log4j.ConsoleAppender
@@ -46,6 +47,8 @@ LOG4J.PROPERTIES_log4j.appender.ARF.file=/opt/hadoop/logs/${module.name}-${user.
 HDDS_DN_OPTS=-Dmodule.name=datanode
 HDFS_OM_OPTS=-Dmodule.name=om
 HDFS_STORAGECONTAINERMANAGER_OPTS=-Dmodule.name=scm
+HDFS_OM_SH_OPTS=-Dmodule.name=sh
+HDFS_SCM_CLI_OPTS=-Dmodule.name=scmcli
 
 #Enable this variable to print out all hadoop rpc traffic to the stdout. See http://byteman.jboss.org/ to define your own instrumentation.
 #BYTEMAN_SCRIPT_URL=https://raw.githubusercontent.com/apache/hadoop/trunk/dev-support/byteman/hadooprpc.btm
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/network-config b/hadoop-ozone/dist/src/main/compose/ozone-topology/network-config
similarity index 100%
rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/network-config
rename to hadoop-ozone/dist/src/main/compose/ozone-topology/network-config
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-net-topology/test.sh b/hadoop-ozone/dist/src/main/compose/ozone-topology/test.sh
similarity index 100%
rename from hadoop-ozone/dist/src/main/compose/ozone-net-topology/test.sh
rename to hadoop-ozone/dist/src/main/compose/ozone-topology/test.sh
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 868e04a..2b510fc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -100,9 +101,11 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.either;
+
 import org.junit.Assert;
 
 import static org.junit.Assert.assertEquals;
@@ -114,6 +117,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is an abstract class to test all the public facing APIs of Ozone
@@ -124,6 +129,8 @@ import org.junit.Test;
  */
 public abstract class TestOzoneRpcClientAbstract {
 
+  static final Logger LOG =
+      LoggerFactory.getLogger(TestOzoneRpcClientAbstract.class);
   private static MiniOzoneCluster cluster = null;
   private static OzoneClient ozClient = null;
   private static ObjectStore store = null;
@@ -140,7 +147,7 @@ public abstract class TestOzoneRpcClientAbstract {
    */
   static void startCluster(OzoneConfiguration conf) throws Exception {
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(10)
+        .setNumDatanodes(3)
         .setScmId(scmId)
         .build();
     cluster.waitForClusterToBeReady();
@@ -664,6 +671,80 @@ public abstract class TestOzoneRpcClientAbstract {
     Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize());
   }
 
+  /**
+   * Tests get the information of key with network topology awareness enabled.
+   * @throws IOException
+   */
+  @Test
+  public void testGetKeyAndFileWithNetworkTopology() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    String keyName = UUID.randomUUID().toString();
+
+    // Write data into a key
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes().length, ReplicationType.RATIS,
+        THREE, new HashMap<>());
+    out.write(value.getBytes());
+    out.close();
+
+    // Since the rpc client is outside of cluster, then getFirstNode should be
+    // equal to getClosestNode.
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
+    builder.setVolumeName(volumeName).setBucketName(bucketName)
+        .setKeyName(keyName).setRefreshPipeline(true);
+
+    // read key with topology aware read enabled(default)
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading key should success");
+    }
+    // read file with topology aware read enabled(default)
+    try {
+      OzoneInputStream is = bucket.readFile(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading file should success");
+    }
+
+    // read key with topology aware read disabled
+    Configuration conf = cluster.getConf();
+    conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false");
+    OzoneClient newClient = OzoneClientFactory.getRpcClient(conf);
+    ObjectStore newStore = newClient.getObjectStore();
+    OzoneBucket newBucket =
+        newStore.getVolume(volumeName).getBucket(bucketName);
+    try {
+      OzoneInputStream is = newBucket.readKey(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading key should success");
+    }
+    // read file with topology aware read disabled
+
+    try {
+      OzoneInputStream is = newBucket.readFile(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading file should success");
+    }
+  }
 
   @Test
   public void testPutKeyRatisOneNode()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 7b0b8c4..dc6e407 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -19,18 +19,37 @@
 package org.apache.hadoop.ozone.client.rpc;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+import static org.junit.Assert.fail;
 
 /**
  * This class is to test all the public facing APIs of Ozone Client with an
  * active OM Ratis server.
  */
 public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
-
+  private static OzoneConfiguration conf;
   /**
    * Create a MiniOzoneCluster for testing.
    * Ozone is made active by setting OZONE_ENABLED = true.
@@ -41,7 +60,7 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
    */
   @BeforeClass
   public static void init() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
+    conf = new OzoneConfiguration();
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
     conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
     startCluster(conf);
@@ -55,4 +74,77 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
     shutdownCluster();
   }
 
+  /**
+   * Tests get the information of key with network topology awareness enabled.
+   * @throws IOException
+   */
+  @Test
+  public void testGetKeyAndFileWithNetworkTopology() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    getStore().createVolume(volumeName);
+    OzoneVolume volume = getStore().getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    String keyName = UUID.randomUUID().toString();
+
+    // Write data into a key
+    OzoneOutputStream out = bucket.createKey(keyName,
+        value.getBytes().length, ReplicationType.RATIS,
+        THREE, new HashMap<>());
+    out.write(value.getBytes());
+    out.close();
+
+    // Since the rpc client is outside of cluster, then getFirstNode should be
+    // equal to getClosestNode.
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
+    builder.setVolumeName(volumeName).setBucketName(bucketName)
+        .setKeyName(keyName).setRefreshPipeline(true);
+
+    // read key with topology aware read enabled(default)
+    try {
+      OzoneInputStream is = bucket.readKey(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading key should success");
+    }
+    // read file with topology aware read enabled(default)
+    try {
+      OzoneInputStream is = bucket.readFile(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading file should success");
+    }
+
+    // read key with topology aware read disabled
+    conf.set(ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED, "false");
+    OzoneClient newClient = OzoneClientFactory.getRpcClient(conf);
+    ObjectStore newStore = newClient.getObjectStore();
+    OzoneBucket newBucket =
+        newStore.getVolume(volumeName).getBucket(bucketName);
+    try {
+      OzoneInputStream is = newBucket.readKey(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading key should success");
+    }
+    // read file with topology aware read disabled
+
+    try {
+      OzoneInputStream is = newBucket.readFile(keyName);
+      byte[] b = new byte[value.getBytes().length];
+      is.read(b);
+      Assert.assertTrue(Arrays.equals(b, value.getBytes()));
+    } catch (OzoneChecksumException e) {
+      fail("Reading file should success");
+    }
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index e9e6b25..0aa301a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -36,7 +36,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.TestUtils;
@@ -44,7 +46,12 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.net.NodeSchema;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -53,6 +60,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -71,6 +79,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,6 +89,11 @@ import org.mockito.Mockito;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 
 /**
@@ -91,6 +105,7 @@ public class TestKeyManagerImpl {
   private static KeyManagerImpl keyManager;
   private static VolumeManagerImpl volumeManager;
   private static BucketManagerImpl bucketManager;
+  private static NodeManager nodeManager;
   private static StorageContainerManager scm;
   private static ScmBlockLocationProtocol mockScmBlockLocationProtocol;
   private static OzoneConfiguration conf;
@@ -113,9 +128,17 @@ public class TestKeyManagerImpl {
     metadataManager = new OmMetadataManagerImpl(conf);
     volumeManager = new VolumeManagerImpl(metadataManager, conf);
     bucketManager = new BucketManagerImpl(metadataManager);
-    NodeManager nodeManager = new MockNodeManager(true, 10);
+    nodeManager = new MockNodeManager(true, 10);
+    NodeSchema[] schemas = new NodeSchema[]
+        {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+    NodeSchemaManager schemaManager = NodeSchemaManager.getInstance();
+    schemaManager.init(schemas, false);
+    NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager);
+    nodeManager.getAllNodes().stream().forEach(node -> clusterMap.add(node));
+    ((MockNodeManager)nodeManager).setNetworkTopology(clusterMap);
     SCMConfigurator configurator = new SCMConfigurator();
     configurator.setScmNodeManager(nodeManager);
+    configurator.setNetworkTopology(clusterMap);
     scm = TestUtils.getScm(conf, configurator);
     scm.start();
     scm.exitSafeMode();
@@ -563,7 +586,7 @@ public class TestKeyManagerImpl {
 
     // lookup for a non-existent file
     try {
-      keyManager.lookupFile(keyArgs);
+      keyManager.lookupFile(keyArgs, null);
       Assert.fail("Lookup file should fail for non existent file");
     } catch (OMException ex) {
       if (ex.getResult() != OMException.ResultCodes.FILE_NOT_FOUND) {
@@ -576,14 +599,15 @@ public class TestKeyManagerImpl {
     keyArgs.setLocationInfoList(
         keySession.getKeyInfo().getLatestVersionLocations().getLocationList());
     keyManager.commitKey(keyArgs, keySession.getId());
-    Assert.assertEquals(keyManager.lookupFile(keyArgs).getKeyName(), keyName);
+    Assert.assertEquals(keyManager.lookupFile(keyArgs, null).getKeyName(),
+        keyName);
 
     // lookup for created file
     keyArgs = createBuilder()
         .setKeyName("")
         .build();
     try {
-      keyManager.lookupFile(keyArgs);
+      keyManager.lookupFile(keyArgs, null);
       Assert.fail("Lookup file should fail for a directory");
     } catch (OMException ex) {
       if (ex.getResult() != OMException.ResultCodes.NOT_A_FILE) {
@@ -597,6 +621,81 @@ public class TestKeyManagerImpl {
   }
 
   @Test
+  public void testLookupKeyWithLocation() throws IOException {
+    String keyName = RandomStringUtils.randomAlphabetic(5);
+    OmKeyArgs keyArgs = createBuilder()
+        .setKeyName(keyName)
+        .build();
+
+    // lookup for a non-existent key
+    try {
+      keyManager.lookupKey(keyArgs, null);
+      Assert.fail("Lookup key should fail for non existent key");
+    } catch (OMException ex) {
+      if (ex.getResult() != OMException.ResultCodes.KEY_NOT_FOUND) {
+        throw ex;
+      }
+    }
+
+    // create a key
+    OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+    // randomly select 3 datanodes
+    List<DatanodeDetails> nodeList = new ArrayList<>();
+    nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+        0, null, null, null, null, 0));
+    nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+        1, null, null, null, null, 0));
+    nodeList.add((DatanodeDetails)scm.getClusterMap().getNode(
+        2, null, null, null, null, 0));
+    Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(1)));
+    Assume.assumeFalse(nodeList.get(0).equals(nodeList.get(2)));
+    // create a pipeline using 3 datanodes
+    Pipeline pipeline = scm.getPipelineManager().createPipeline(
+        ReplicationType.RATIS, ReplicationFactor.THREE, nodeList);
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    locationInfoList.add(
+        new OmKeyLocationInfo.Builder().setPipeline(pipeline)
+            .setBlockID(new BlockID(1L, 1L)).build());
+    keyArgs.setLocationInfoList(locationInfoList);
+
+    keyManager.commitKey(keyArgs, keySession.getId());
+
+    OmKeyInfo key = keyManager.lookupKey(keyArgs, null);
+    Assert.assertEquals(key.getKeyName(), keyName);
+    List<OmKeyLocationInfo> keyLocations =
+        key.getLatestVersionLocations().getLocationList();
+    DatanodeDetails leader =
+        keyLocations.get(0).getPipeline().getFirstNode();
+    DatanodeDetails follower1 =
+        keyLocations.get(0).getPipeline().getNodes().get(1);
+    DatanodeDetails follower2 =
+        keyLocations.get(0).getPipeline().getNodes().get(2);
+    Assert.assertNotEquals(leader, follower1);
+    Assert.assertNotEquals(follower1, follower2);
+
+    // lookup key, leader as client
+    OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getNetworkName());
+    Assert.assertEquals(leader, key1.getLatestVersionLocations()
+        .getLocationList().get(0).getPipeline().getClosestNode());
+
+    // lookup key, follower1 as client
+    OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getNetworkName());
+    Assert.assertEquals(follower1, key2.getLatestVersionLocations()
+        .getLocationList().get(0).getPipeline().getClosestNode());
+
+    // lookup key, follower2 as client
+    OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getNetworkName());
+    Assert.assertEquals(follower2, key3.getLatestVersionLocations()
+        .getLocationList().get(0).getPipeline().getClosestNode());
+
+    // lookup key, random node as client
+    OmKeyInfo key4 = keyManager.lookupKey(keyArgs,
+        "/d=default-drack/127.0.0.1");
+    Assert.assertEquals(leader, key4.getLatestVersionLocations()
+        .getLocationList().get(0).getPipeline().getClosestNode());
+  }
+
+  @Test
   public void testListStatus() throws IOException {
     String superDir = RandomStringUtils.randomAlphabetic(5);
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index 5d739c2..a35c474 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -258,7 +258,7 @@ public class TestOmMetrics {
 
     Mockito.doReturn(null).when(mockKm).openKey(null);
     Mockito.doNothing().when(mockKm).deleteKey(null);
-    Mockito.doReturn(null).when(mockKm).lookupKey(null);
+    Mockito.doReturn(null).when(mockKm).lookupKey(null, "");
     Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
     Mockito.doNothing().when(mockKm).commitKey(any(OmKeyArgs.class), anyLong());
     Mockito.doReturn(null).when(mockKm).initiateMultipartUpload(
@@ -293,7 +293,7 @@ public class TestOmMetrics {
     // inject exception to test for Failure Metrics
     Mockito.doThrow(exception).when(mockKm).openKey(null);
     Mockito.doThrow(exception).when(mockKm).deleteKey(null);
-    Mockito.doThrow(exception).when(mockKm).lookupKey(null);
+    Mockito.doThrow(exception).when(mockKm).lookupKey(null, "");
     Mockito.doThrow(exception).when(mockKm).listKeys(
         null, null, null, null, 0);
     Mockito.doThrow(exception).when(mockKm).commitKey(any(OmKeyArgs.class),
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 1259f71..1f8eb73 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -127,10 +127,12 @@ public interface KeyManager extends OzoneManagerFS {
    * DistributedStorageHandler will use to access the data on datanode.
    *
    * @param args the args of the key provided by client.
+   * @param clientAddress a hint to key manager, order the datanode in returned
+   *                      pipeline by distance between client and datanode.
    * @return a OmKeyInfo instance client uses to talk to container.
    * @throws IOException
    */
-  OmKeyInfo lookupKey(OmKeyArgs args) throws IOException;
+  OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress) throws IOException;
 
   /**
    * Renames an existing key within a bucket.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 99e3e66..222b149 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -663,7 +664,8 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+  public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress)
+      throws IOException {
     Preconditions.checkNotNull(args);
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
@@ -718,6 +720,7 @@ public class KeyManagerImpl implements KeyManager {
           });
         }
       }
+      sortDatanodeInPipeline(value, clientAddress);
       return value;
     } catch (IOException ex) {
       LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
@@ -1855,7 +1858,8 @@ public class KeyManagerImpl implements KeyManager {
    *                     invalid arguments
    */
   @Override
-  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
+  public OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress)
+      throws IOException {
     Preconditions.checkNotNull(args, "Key args can not be null");
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
@@ -1865,6 +1869,7 @@ public class KeyManagerImpl implements KeyManager {
     try {
       OzoneFileStatus fileStatus = getFileStatus(args);
       if (fileStatus.isFile()) {
+        sortDatanodeInPipeline(fileStatus.getKeyInfo(), clientAddress);
         return fileStatus.getKeyInfo();
       }
       //if key is not of type file or if key is not found we throw an exception
@@ -2052,4 +2057,31 @@ public class KeyManagerImpl implements KeyManager {
     return encInfo;
   }
 
+  private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) {
+    if (keyInfo != null && clientMachine != null && !clientMachine.isEmpty()) {
+      for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
+        key.getLocationList().forEach(k -> {
+          List<DatanodeDetails> nodes = k.getPipeline().getNodes();
+          List<String> nodeList = new ArrayList<>();
+          nodes.stream().forEach(node ->
+              nodeList.add(node.getNetworkName()));
+          try {
+            List<DatanodeDetails> sortedNodes = scmClient.getBlockClient()
+                .sortDatanodes(nodeList, clientMachine);
+            k.getPipeline().setNodesInOrder(sortedNodes);
+            LOG.debug("Sort datanodes {} for client {}, return {}", nodes,
+                clientMachine, sortedNodes);
+          } catch (IOException e) {
+            LOG.warn("Unable to sort datanodes based on distance to " +
+                "client, volume=" + keyInfo.getVolumeName() +
+                ", bucket=" + keyInfo.getBucketName() +
+                ", key=" + keyInfo.getKeyName() +
+                ", client=" + clientMachine +
+                ", datanodes=" + nodes.toString() +
+                ", exception=" + e.getMessage());
+          }
+        });
+      }
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 8032b6d..7b2a83f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2347,7 +2347,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     boolean auditSuccess = true;
     try {
       metrics.incNumKeyLookups();
-      return keyManager.lookupKey(args);
+      return keyManager.lookupKey(args, getClientAddress());
     } catch (Exception ex) {
       metrics.incNumKeyLookupFails();
       auditSuccess = false;
@@ -2550,6 +2550,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  private static String getClientAddress() {
+    String clientMachine = Server.getRemoteAddress();
+    if (clientMachine == null) { //not a RPC client
+      clientMachine = "";
+    }
+    return clientMachine;
+  }
+
   @Override
   public String getRpcPort() {
     return "" + omRpcAddress.getPort();
@@ -2975,7 +2983,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     boolean auditSuccess = true;
     try {
       metrics.incNumLookupFile();
-      return keyManager.lookupFile(args);
+      return keyManager.lookupFile(args, getClientAddress());
     } catch (Exception ex) {
       metrics.incNumLookupFileFails();
       auditSuccess = false;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
index bff883d..647931a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
@@ -38,7 +38,16 @@ public interface OzoneManagerFS extends IOzoneAcl {
   OpenKeySession createFile(OmKeyArgs args, boolean isOverWrite,
       boolean isRecursive) throws IOException;
 
-  OmKeyInfo lookupFile(OmKeyArgs args) throws IOException;
+  /**
+   * Look up a file. Return the info of the file to client side.
+   *
+   * @param args the args of the key provided by client.
+   * @param clientAddress a hint to key manager, order the datanode in returned
+   *                      pipeline by distance between client and datanode.
+   * @return a OmKeyInfo instance client uses to talk to container.
+   * @throws IOException
+   */
+  OmKeyInfo lookupFile(OmKeyArgs args, String clientAddress) throws IOException;
 
   List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
       String startKey, long numEntries) throws IOException;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
index e546d79..b8534de 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
@@ -183,6 +183,12 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
   }
 
   @Override
+  public List<DatanodeDetails> sortDatanodes(List<String> nodes,
+      String clientMachine) throws IOException {
+    return null;
+  }
+
+  @Override
   public void close() throws IOException {
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message