Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7A6810B4E for ; Mon, 9 Feb 2015 20:32:53 +0000 (UTC) Received: (qmail 73053 invoked by uid 500); 9 Feb 2015 20:32:53 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 72969 invoked by uid 500); 9 Feb 2015 20:32:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 72956 invoked by uid 99); 9 Feb 2015 20:32:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Feb 2015 20:32:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 435D8E0120; Mon, 9 Feb 2015 20:32:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Mon, 09 Feb 2015 20:32:53 -0000 Message-Id: <3394bd100ebc4e5988c3a654c76e4548@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hadoop git commit: HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai) Repository: hadoop Updated Branches: refs/heads/branch-2 b1aad1d94 -> ff900eb64 refs/heads/trunk 241336ca2 -> ab934e859 HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab934e85 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab934e85 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab934e85 Branch: refs/heads/trunk Commit: ab934e85947dcf2092050023909dd81ae274ff45 Parents: 241336c Author: Arpit Agarwal Authored: Mon Feb 9 12:17:40 2015 -0800 Committer: Arpit Agarwal Committed: Mon Feb 9 12:17:40 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/net/NetworkTopology.java | 2 +- .../net/NetworkTopologyWithNodeGroup.java | 2 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/protocol/LocatedBlock.java | 77 ++++++++++++++---- .../server/blockmanagement/DatanodeManager.java | 2 + .../protocol/DatanodeInfoWithStorage.java | 59 ++++++++++++++ .../apache/hadoop/hdfs/TestDecommission.java | 10 ++- .../blockmanagement/TestDatanodeManager.java | 84 ++++++++++++++++++++ 8 files changed, 218 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index aaa5ae3..fc8bf52 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -859,7 +859,7 @@ public class NetworkTopology { // Start off by initializing to off rack int weight = 2; if (reader != null) { - if (reader == node) { + if (reader.equals(node)) { weight = 0; } else if (isOnSameRack(reader, node)) { weight = 1; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 13160eb..3de49dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -254,7 +254,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { // Start off by initializing to off rack int weight = 3; if (reader != null) { - if (reader == node) { + if (reader.equals(node)) { weight = 0; } else if (isOnSameNodeGroup(reader, node)) { weight = 1; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index eda3744..4396e3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -872,6 +872,9 @@ Release 2.7.0 - UNRELEASED HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and HdfsDataInputStream. (yliu) + HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos + but not StorageIDs. (Milan Desai via Arpit Agarwal) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 30368f6..7fb2e30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.security.token.Token; import com.google.common.collect.Lists; @@ -41,11 +42,13 @@ public class LocatedBlock { private final ExtendedBlock b; private long offset; // offset of the first byte of the block in the file - private final DatanodeInfo[] locs; - /** Storage ID for each replica */ - private final String[] storageIDs; - // Storage type for each replica, if reported. - private final StorageType[] storageTypes; + private final DatanodeInfoWithStorage[] locs; + private final boolean hasStorageIDs; + private final boolean hasStorageTypes; + /** Cached storage ID for each replica */ + private String[] storageIDs; + /** Cached storage type for each replica, if reported. */ + private StorageType[] storageTypes; // corrupt flag is true if all of the replicas of a block are corrupt. // else false. If block has few corrupt replicas, they are filtered and // their locations are not part of this object @@ -57,7 +60,8 @@ public class LocatedBlock { private DatanodeInfo[] cachedLocs; // Used when there are no locations - private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0]; + private static final DatanodeInfoWithStorage[] EMPTY_LOCS = + new DatanodeInfoWithStorage[0]; public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { this(b, locs, -1, false); // startOffset is unknown @@ -94,10 +98,22 @@ public class LocatedBlock { if (locs==null) { this.locs = EMPTY_LOCS; } else { - this.locs = locs; + this.locs = new DatanodeInfoWithStorage[locs.length]; + for(int i = 0; i < locs.length; i++) { + DatanodeInfo di = locs[i]; + DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di, + storageIDs != null ? storageIDs[i] : null, + storageTypes != null ? storageTypes[i] : null); + storage.setDependentHostNames(di.getDependentHostNames()); + storage.setLevel(di.getLevel()); + storage.setParent(di.getParent()); + this.locs[i] = storage; + } } this.storageIDs = storageIDs; this.storageTypes = storageTypes; + this.hasStorageIDs = storageIDs != null; + this.hasStorageTypes = storageTypes != null; if (cachedLocs == null || cachedLocs.length == 0) { this.cachedLocs = EMPTY_LOCS; @@ -118,18 +134,53 @@ public class LocatedBlock { return b; } - public DatanodeInfo[] getLocations() { + /** + * Returns the locations associated with this block. The returned array is not + * expected to be modified. If it is, caller must immediately invoke + * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo} + * to invalidate the cached Storage ID/Type arrays. + */ + public DatanodeInfoWithStorage[] getLocations() { return locs; } public StorageType[] getStorageTypes() { + if(!hasStorageTypes) { + return null; + } + if(storageTypes != null) { + return storageTypes; + } + storageTypes = new StorageType[locs.length]; + for(int i = 0; i < locs.length; i++) { + storageTypes[i] = locs[i].getStorageType(); + } return storageTypes; } public String[] getStorageIDs() { + if(!hasStorageIDs) { + return null; + } + if(storageIDs != null) { + return storageIDs; + } + storageIDs = new String[locs.length]; + for(int i = 0; i < locs.length; i++) { + storageIDs[i] = locs[i].getStorageID(); + } return storageIDs; } + /** + * Invalidates the cached StorageID and StorageType information. Must be + * called when the locations array is modified. + */ + public void invalidateCachedStorageInfo() { + storageIDs = null; + storageTypes = null; + } + public long getStartOffset() { return offset; } @@ -161,9 +212,9 @@ public class LocatedBlock { return; } // Try to re-use a DatanodeInfo already in loc - for (int i=0; i nodes = new ArrayList(); ArrayList dnInfos = new ArrayList(); - + + DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) { DatanodeInfo found = datanodeInfo; for (DatanodeInfo dif: dnInfos4LastBlock) { if (datanodeInfo.equals(dif)) { - found = null; + found = null; } } if (found != null) { nodes.add(found.getXferAddr()); - dnInfos.add(found); + dnInfos.add(dm.getDatanode(found)); } } //decommission one of the 3 nodes which have last block nodes.add(dnInfos4LastBlock[0].getXferAddr()); - dnInfos.add(dnInfos4LastBlock[0]); + dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0])); writeConfigFile(excludeFile, nodes); refreshNodes(ns, conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 2c65fff..adf31a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,13 +32,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.net.DNSToSwitchMapping; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; public class TestDatanodeManager { @@ -210,4 +217,81 @@ public class TestDatanodeManager { public void reloadCachedMappings(List names) { } } + + /** + * This test creates a LocatedBlock with 5 locations, sorts the locations + * based on the network topology, and ensures the locations are still aligned + * with the storage ids and storage types. + */ + @Test + public void testSortLocatedBlocks() throws IOException { + // create the DatanodeManager which will be tested + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), + fsn, new Configuration()); + + // register 5 datanodes, each with different storage ID and type + DatanodeInfo[] locs = new DatanodeInfo[5]; + String[] storageIDs = new String[5]; + StorageType[] storageTypes = new StorageType[]{ + StorageType.ARCHIVE, + StorageType.DEFAULT, + StorageType.DISK, + StorageType.RAM_DISK, + StorageType.SSD + }; + for(int i = 0; i < 5; i++) { + // register new datanode + String uuid = "UUID-"+i; + String ip = "IP-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.registerDatanode(dr); + + // get location and storage information + locs[i] = dm.getDatanode(uuid); + storageIDs[i] = "storageID-"+i; + } + + // set first 2 locations as decomissioned + locs[0].setDecommissioned(); + locs[1].setDecommissioned(); + + // create LocatedBlock with above locations + ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); + LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes); + List blocks = new ArrayList<>(); + blocks.add(block); + + final String targetIp = locs[4].getIpAddr(); + + // sort block locations + dm.sortLocatedBlocks(targetIp, blocks); + + // check that storage IDs/types are aligned with datanode locs + DatanodeInfoWithStorage[] sortedLocs = block.getLocations(); + storageIDs = block.getStorageIDs(); + storageTypes = block.getStorageTypes(); + assertThat(sortedLocs.length, is(5)); + assertThat(storageIDs.length, is(5)); + assertThat(storageTypes.length, is(5)); + for(int i = 0; i < sortedLocs.length; i++) { + assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i])); + assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i])); + } + + // Ensure the local node is first. + assertThat(sortedLocs[0].getIpAddr(), is(targetIp)); + + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[sortedLocs.length-1].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertThat(sortedLocs[sortedLocs.length-2].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + } }