From common-commits-return-98740-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed May 6 11:28:49 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 933B018064F for ; Wed, 6 May 2020 13:28:49 +0200 (CEST) Received: (qmail 97644 invoked by uid 500); 6 May 2020 11:28:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 97635 invoked by uid 99); 6 May 2020 11:28:48 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 May 2020 11:28:48 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 199D08BFAB; Wed, 6 May 2020 11:28:47 +0000 (UTC) Date: Wed, 06 May 2020 11:28:47 +0000 To: "common-commits@hadoop.apache.org" Subject: [hadoop] branch trunk updated: HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158876452766.31404.16050820194219558891@gitbox.apache.org> From: ayushsaxena@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: hadoop X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: adecdb8b534c536354f4f47789467ffb82dd1496 X-Git-Newrev: 7fddf4855e92627e11063318ac70f59e9316879c X-Git-Rev: 7fddf4855e92627e11063318ac70f59e9316879c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ayushsaxena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git The following commit(s) were added to refs/heads/trunk by this push: new 7fddf48 HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun. 7fddf48 is described below commit 7fddf4855e92627e11063318ac70f59e9316879c Author: Ayush Saxena AuthorDate: Wed May 6 16:55:04 2020 +0530 HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun. --- .../org/apache/hadoop/hdfs/DFSInputStream.java | 26 +++++++-- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 ++ .../hadoop/hdfs/client/impl/DfsClientConf.java | 13 +++++ .../apache/hadoop/hdfs/protocol/LocatedBlock.java | 1 + .../src/main/resources/hdfs-default.xml | 9 ++++ .../org/apache/hadoop/hdfs/TestDFSInputStream.java | 63 ++++++++++++++++++++++ 6 files changed, 112 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index af9891a..0676cf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1054,10 +1054,21 @@ public class DFSInputStream extends FSInputStream StorageType[] storageTypes = block.getStorageTypes(); DatanodeInfo chosenNode = null; StorageType storageType = null; - if (nodes != null) { + if (dfsClient.getConf().isReadUseCachePriority()) { + DatanodeInfo[] cachedLocs = block.getCachedLocations(); + if (cachedLocs != null) { + for (int i = 0; i < cachedLocs.length; i++) { + if (isValidNode(cachedLocs[i], ignoredNodes)) { + chosenNode = cachedLocs[i]; + break; + } + } + } + } + + if (chosenNode == null && nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (!dfsClient.getDeadNodes(this).containsKey(nodes[i]) - && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { + if (isValidNode(nodes[i], ignoredNodes)) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same // index to get storage type. @@ -1090,6 +1101,15 @@ public class DFSInputStream extends FSInputStream ", ignoredNodes = " + ignoredNodes); } + private boolean isValidNode(DatanodeInfo node, + Collection ignoredNodes) { + if (!dfsClient.getDeadNodes(this).containsKey(node) + && (ignoredNodes == null || !ignoredNodes.contains(node))) { + return true; + } + return false; + } + private static String getBestNodeDNAddrPairErrorString( DatanodeInfo nodes[], AbstractMap deadNodes, Collection ignoredNodes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 407462c..efc2766 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -108,6 +108,9 @@ public interface HdfsClientConfigKeys { String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local"; boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false; + String DFS_CLIENT_READ_USE_CACHE_PRIORITY = + "dfs.client.read.use.cache.priority"; + boolean DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT = false; String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout"; long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 07f0eee..918fef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -60,6 +60,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; @@ -150,6 +152,8 @@ public class DfsClientConf { private final boolean dataTransferTcpNoDelay; + private final boolean readUseCachePriority; + private final boolean deadNodeDetectionEnabled; private final long leaseHardLimitPeriod; @@ -260,6 +264,8 @@ public class DfsClientConf { slowIoWarningThresholdMs = conf.getLong( DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, + DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT); refreshReadBlockLocationsMS = conf.getLong( HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, @@ -631,6 +637,13 @@ public class DfsClientConf { } /** + * @return the readUseCachePriority + */ + public boolean isReadUseCachePriority() { + return readUseCachePriority; + } + + /** * @return the replicaAccessorBuilderClasses */ public List> diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 29f1b6d..c1b6a54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -268,6 +268,7 @@ public class LocatedBlock { + "; corrupt=" + corrupt + "; offset=" + offset + "; locs=" + Arrays.asList(locs) + + "; cachedLocs=" + Arrays.asList(cachedLocs) + "}"; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 913e47b..bf18f3a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2979,6 +2979,15 @@ + dfs.client.read.use.cache.priority + false + + If true, the cached replica of the datanode is preferred + else the replica closest to client is preferred. + + + + dfs.block.local-path-access.user diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java index 0d322da..bdc342a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -33,7 +36,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -219,4 +224,62 @@ public class TestDFSInputStream { cluster.shutdown(); } } + + @Test + public void testReadWithPreferredCachingReplica() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, true); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + DistributedFileSystem fs = null; + Path filePath = new Path("/testReadPreferredCachingReplica"); + try { + fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + DFSInputStream dfsInputStream = + (DFSInputStream) fs.open(filePath).getWrappedStream(); + LocatedBlock lb = mock(LocatedBlock.class); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]); + DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111, + 1112, 1113, 1114); + DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + DatanodeInfo retDNInfo = + dfsInputStream.getBestNodeDNAddrPair(lb, null).info; + assertEquals(dnInfo, retDNInfo); + } finally { + fs.delete(filePath, true); + cluster.shutdown(); + } + } + + @Test + public void testReadWithoutPreferredCachingReplica() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, false); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + DistributedFileSystem fs = null; + Path filePath = new Path("/testReadWithoutPreferredCachingReplica"); + try { + fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + DFSInputStream dfsInputStream = + (DFSInputStream) fs.open(filePath).getWrappedStream(); + LocatedBlock lb = mock(LocatedBlock.class); + when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]); + DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111, + 1112, 1113, 1114); + DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId); + when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo}); + DatanodeInfo retDNInfo = + dfsInputStream.getBestNodeDNAddrPair(lb, null).info; + assertEquals(dnInfo, retDNInfo); + } finally { + fs.delete(filePath, true); + cluster.shutdown(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org