Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E4A05200BD5 for ; Thu, 8 Dec 2016 23:29:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E35FB160B27; Thu, 8 Dec 2016 22:29:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB177160B30 for ; Thu, 8 Dec 2016 23:29:24 +0100 (CET) Received: (qmail 83536 invoked by uid 500); 8 Dec 2016 22:29:10 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 81798 invoked by uid 99); 8 Dec 2016 22:29:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Dec 2016 22:29:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 57786F2DCF; Thu, 8 Dec 2016 22:29:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stevel@apache.org To: common-commits@hadoop.apache.org Date: Thu, 08 Dec 2016 22:29:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] hadoop git commit: HDFS-10206. Datanodes not sorted properly by distance when the reader isn't a datanode. (Nandakumar via mingma) archived-at: Thu, 08 Dec 2016 22:29:26 -0000 HDFS-10206. Datanodes not sorted properly by distance when the reader isn't a datanode. (Nandakumar via mingma) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c73e08a6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c73e08a6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c73e08a6 Branch: refs/heads/HADOOP-13345 Commit: c73e08a6dad46cad14b38a4a586a5cda1622b206 Parents: 563480d Author: Ming Ma Authored: Wed Dec 7 08:26:09 2016 -0800 Committer: Ming Ma Committed: Wed Dec 7 08:26:09 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/net/NetworkTopology.java | 158 +++++++++++++++++-- .../server/blockmanagement/DatanodeManager.java | 12 +- .../apache/hadoop/net/TestNetworkTopology.java | 29 +++- 3 files changed, 182 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73e08a6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 14c870d..5751d2b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -57,6 +57,10 @@ public class NetworkTopology { public static final Logger LOG = LoggerFactory.getLogger(NetworkTopology.class); + private static final char PATH_SEPARATOR = '/'; + private static final String PATH_SEPARATOR_STR = "/"; + private static final String ROOT = "/"; + public static class InvalidTopologyException extends RuntimeException { private static final long serialVersionUID = 1L; public InvalidTopologyException(String msg) { @@ -916,7 +920,7 @@ public class NetworkTopology { } } - /** convert a network tree to a string */ + /** convert a network tree to a string. */ @Override public String toString() { // print the number of racks @@ -970,19 +974,108 @@ public class NetworkTopology { * @return weight */ protected int getWeight(Node reader, Node node) { - // 0 is local, 1 is same rack, 2 is off rack - // Start off by initializing to off rack - int weight = 2; - if (reader != null) { - if (reader.equals(node)) { - weight = 0; - } else if (isOnSameRack(reader, node)) { - weight = 1; + // 0 is local, 2 is same rack, and each level on each node increases the + //weight by 1 + //Start off by initializing to Integer.MAX_VALUE + int weight = Integer.MAX_VALUE; + if (reader != null && node != null) { + if(reader.equals(node)) { + return 0; + } + int maxReaderLevel = reader.getLevel(); + int maxNodeLevel = node.getLevel(); + int currentLevelToCompare = maxReaderLevel > maxNodeLevel ? + maxNodeLevel : maxReaderLevel; + Node r = reader; + Node n = node; + weight = 0; + while(r != null && r.getLevel() > currentLevelToCompare) { + r = r.getParent(); + weight++; + } + while(n != null && n.getLevel() > currentLevelToCompare) { + n = n.getParent(); + weight++; + } + while(r != null && n != null && !r.equals(n)) { + r = r.getParent(); + n = n.getParent(); + weight+=2; + } + } + return weight; + } + + /** + * Returns an integer weight which specifies how far away node is + * from reader. A lower value signifies that a node is closer. + * It uses network location to calculate the weight + * + * @param reader Node where data will be read + * @param node Replica of data + * @return weight + */ + private static int getWeightUsingNetworkLocation(Node reader, Node node) { + //Start off by initializing to Integer.MAX_VALUE + int weight = Integer.MAX_VALUE; + if(reader != null && node != null) { + String readerPath = normalizeNetworkLocationPath( + reader.getNetworkLocation()); + String nodePath = normalizeNetworkLocationPath( + node.getNetworkLocation()); + + //same rack + if(readerPath.equals(nodePath)) { + if(reader.getName().equals(node.getName())) { + weight = 0; + } else { + weight = 2; + } + } else { + String[] readerPathToken = readerPath.split(PATH_SEPARATOR_STR); + String[] nodePathToken = nodePath.split(PATH_SEPARATOR_STR); + int maxLevelToCompare = readerPathToken.length > nodePathToken.length ? + nodePathToken.length : readerPathToken.length; + int currentLevel = 1; + //traverse through the path and calculate the distance + while(currentLevel < maxLevelToCompare) { + if(!readerPathToken[currentLevel] + .equals(nodePathToken[currentLevel])){ + break; + } + currentLevel++; + } + weight = (readerPathToken.length - currentLevel) + + (nodePathToken.length - currentLevel); } } return weight; } + /** Normalize a path by stripping off any trailing {@link #PATH_SEPARATOR}. + * @param path path to normalize. + * @return the normalised path + * If pathis null or empty {@link #ROOT} is returned + * @throws IllegalArgumentException if the first character of a non empty path + * is not {@link #PATH_SEPARATOR} + */ + private static String normalizeNetworkLocationPath(String path) { + if (path == null || path.length() == 0) { + return ROOT; + } + + if (path.charAt(0) != PATH_SEPARATOR) { + throw new IllegalArgumentException("Network Location" + + "path doesn't start with " +PATH_SEPARATOR+ ": "+path); + } + + int len = path.length(); + if (path.charAt(len-1) == PATH_SEPARATOR) { + return path.substring(0, len-1); + } + return path; + } + /** * Sort nodes array by network distance to reader. *

@@ -999,10 +1092,55 @@ public class NetworkTopology { * @param activeLen Number of active nodes at the front of the array */ public void sortByDistance(Node reader, Node[] nodes, int activeLen) { + /* + * This method is called if the reader is a datanode, + * so nonDataNodeReader flag is set to false. + */ + sortByDistance(reader, nodes, activeLen, false); + } + + /** + * Sort nodes array by network distance to reader. + *

using network location. This is used when the reader + * is not a datanode. Sorting the nodes based on network distance + * from the reader reduces network traffic and improves + * performance. + *

+ * + * @param reader Node where data will be read + * @param nodes Available replicas with the requested data + * @param activeLen Number of active nodes at the front of the array + */ + public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes, + int activeLen) { + /* + * This method is called if the reader is not a datanode, + * so nonDataNodeReader flag is set to true. + */ + sortByDistance(reader, nodes, activeLen, true); + } + + /** + * Sort nodes array by network distance to reader. + *

+ * As an additional twist, we also randomize the nodes at each network + * distance. This helps with load balancing when there is data skew. + * + * @param reader Node where data will be read + * @param nodes Available replicas with the requested data + * @param activeLen Number of active nodes at the front of the array + * @param nonDataNodeReader True if the reader is not a datanode + */ + private void sortByDistance(Node reader, Node[] nodes, int activeLen, + boolean nonDataNodeReader) { /** Sort weights for the nodes array */ int[] weights = new int[activeLen]; for (int i=0; i> tree = new TreeMap>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73e08a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 47f15c4..6477d5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -443,9 +443,11 @@ public class DatanodeManager { Comparator comparator) { // As it is possible for the separation of node manager and datanode, // here we should get node but not datanode only . + boolean nonDatanodeReader = false; Node client = getDatanodeByHost(targetHost); if (client == null) { - List hosts = new ArrayList<> (1); + nonDatanodeReader = true; + List hosts = new ArrayList<>(1); hosts.add(targetHost); List resolvedHosts = dnsToSwitchMapping.resolve(hosts); if (resolvedHosts != null && !resolvedHosts.isEmpty()) { @@ -470,8 +472,12 @@ public class DatanodeManager { --lastActiveIndex; } int activeLen = lastActiveIndex + 1; - networktopology.sortByDistance(client, lb.getLocations(), activeLen); - + if(nonDatanodeReader) { + networktopology.sortByDistanceUsingNetworkLocation(client, + lb.getLocations(), activeLen); + } else { + networktopology.sortByDistance(client, lb.getLocations(), activeLen); + } // must update cache since we modified locations array lb.updateCachedStorageInfo(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73e08a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index d149f65..3a281fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -220,11 +220,9 @@ public class TestNetworkTopology { testNodes[2] = dataNodes[3]; cluster.setRandomSeed(0xDEAD); cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length); - // sortByDistance does not take the "data center" layer into consideration - // and it doesn't sort by getDistance, so 1, 5, 3 is also valid here assertTrue(testNodes[0] == dataNodes[1]); - assertTrue(testNodes[1] == dataNodes[5]); - assertTrue(testNodes[2] == dataNodes[3]); + assertTrue(testNodes[1] == dataNodes[3]); + assertTrue(testNodes[2] == dataNodes[5]); // Array of just rack-local nodes // Expect a random first node @@ -264,6 +262,29 @@ public class TestNetworkTopology { } } assertTrue("Expected to find a different first location", foundRandom); + + //Reader is not a datanode, but is in one of the datanode's rack. + testNodes[0] = dataNodes[0]; + testNodes[1] = dataNodes[5]; + testNodes[2] = dataNodes[8]; + Node rackClient = new NodeBase("/d3/r1/25.25.25"); + cluster.setRandomSeed(0xDEADBEEF); + cluster.sortByDistance(rackClient, testNodes, testNodes.length); + assertTrue(testNodes[0] == dataNodes[8]); + assertTrue(testNodes[1] == dataNodes[5]); + assertTrue(testNodes[2] == dataNodes[0]); + + //Reader is not a datanode , but is in one of the datanode's data center. + testNodes[0] = dataNodes[8]; + testNodes[1] = dataNodes[5]; + testNodes[2] = dataNodes[0]; + Node dcClient = new NodeBase("/d1/r2/25.25.25"); + cluster.setRandomSeed(0xDEADBEEF); + cluster.sortByDistance(dcClient, testNodes, testNodes.length); + assertTrue(testNodes[0] == dataNodes[0]); + assertTrue(testNodes[1] == dataNodes[5]); + assertTrue(testNodes[2] == dataNodes[8]); + } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org