hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayushsax...@apache.org
Subject [hadoop] branch trunk updated: HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun.
Date Wed, 06 May 2020 11:28:47 GMT
This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fddf48  HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng
Sun.
7fddf48 is described below

commit 7fddf4855e92627e11063318ac70f59e9316879c
Author: Ayush Saxena <ayushsaxena@apache.org>
AuthorDate: Wed May 6 16:55:04 2020 +0530

    HDFS-14283. DFSInputStream to prefer cached replica. Contributed by Lisheng Sun.
---
 .../org/apache/hadoop/hdfs/DFSInputStream.java     | 26 +++++++--
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  3 ++
 .../hadoop/hdfs/client/impl/DfsClientConf.java     | 13 +++++
 .../apache/hadoop/hdfs/protocol/LocatedBlock.java  |  1 +
 .../src/main/resources/hdfs-default.xml            |  9 ++++
 .../org/apache/hadoop/hdfs/TestDFSInputStream.java | 63 ++++++++++++++++++++++
 6 files changed, 112 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index af9891a..0676cf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1054,10 +1054,21 @@ public class DFSInputStream extends FSInputStream
     StorageType[] storageTypes = block.getStorageTypes();
     DatanodeInfo chosenNode = null;
     StorageType storageType = null;
-    if (nodes != null) {
+    if (dfsClient.getConf().isReadUseCachePriority()) {
+      DatanodeInfo[] cachedLocs = block.getCachedLocations();
+      if (cachedLocs != null) {
+        for (int i = 0; i < cachedLocs.length; i++) {
+          if (isValidNode(cachedLocs[i], ignoredNodes)) {
+            chosenNode = cachedLocs[i];
+            break;
+          }
+        }
+      }
+    }
+
+    if (chosenNode == null && nodes != null) {
       for (int i = 0; i < nodes.length; i++) {
-        if (!dfsClient.getDeadNodes(this).containsKey(nodes[i])
-            && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
+        if (isValidNode(nodes[i], ignoredNodes)) {
           chosenNode = nodes[i];
           // Storage types are ordered to correspond with nodes, so use the same
           // index to get storage type.
@@ -1090,6 +1101,15 @@ public class DFSInputStream extends FSInputStream
         ", ignoredNodes = " + ignoredNodes);
   }
 
+  private boolean isValidNode(DatanodeInfo node,
+      Collection<DatanodeInfo> ignoredNodes) {
+    if (!dfsClient.getDeadNodes(this).containsKey(node)
+        && (ignoredNodes == null || !ignoredNodes.contains(node))) {
+      return true;
+    }
+    return false;
+  }
+
   private static String getBestNodeDNAddrPairErrorString(
       DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
       DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 407462c..efc2766 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -108,6 +108,9 @@ public interface HdfsClientConfigKeys {
   String  DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL =
       "dfs.client.use.legacy.blockreader.local";
   boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false;
+  String DFS_CLIENT_READ_USE_CACHE_PRIORITY =
+      "dfs.client.read.use.cache.priority";
+  boolean DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT = false;
   String  DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY =
       "dfs.client.datanode-restart.timeout";
   long    DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 07f0eee..918fef7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -60,6 +60,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
@@ -150,6 +152,8 @@ public class DfsClientConf {
 
   private final boolean dataTransferTcpNoDelay;
 
+  private final boolean readUseCachePriority;
+
   private final boolean deadNodeDetectionEnabled;
   private final long leaseHardLimitPeriod;
 
@@ -260,6 +264,8 @@ public class DfsClientConf {
     slowIoWarningThresholdMs = conf.getLong(
         DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
         DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+    readUseCachePriority = conf.getBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY,
+        DFS_CLIENT_READ_USE_CACHE_PRIORITY_DEFAULT);
 
     refreshReadBlockLocationsMS = conf.getLong(
         HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
@@ -631,6 +637,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the readUseCachePriority
+   */
+  public boolean isReadUseCachePriority() {
+    return readUseCachePriority;
+  }
+
+  /**
    * @return the replicaAccessorBuilderClasses
    */
   public List<Class<? extends ReplicaAccessorBuilder>>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 29f1b6d..c1b6a54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -268,6 +268,7 @@ public class LocatedBlock {
         + "; corrupt=" + corrupt
         + "; offset=" + offset
         + "; locs=" + Arrays.asList(locs)
+        + "; cachedLocs=" + Arrays.asList(cachedLocs)
         + "}";
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 913e47b..bf18f3a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2979,6 +2979,15 @@
 </property>
 
 <property>
+  <name>dfs.client.read.use.cache.priority</name>
+  <value>false</value>
+  <description>
+    If true, the cached replica of the datanode is preferred
+    else the replica closest to client is preferred.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.local-path-access.user</name>
   <value></value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
index 0d322da..bdc342a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_READ_USE_CACHE_PRIORITY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
@@ -33,7 +36,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@@ -219,4 +224,62 @@ public class TestDFSInputStream {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testReadWithPreferredCachingReplica() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, true);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = null;
+    Path filePath = new Path("/testReadPreferredCachingReplica");
+    try {
+      fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
+      DFSInputStream dfsInputStream =
+          (DFSInputStream) fs.open(filePath).getWrappedStream();
+      LocatedBlock lb = mock(LocatedBlock.class);
+      when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
+      DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
+          1112, 1113, 1114);
+      DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
+      when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
+      DatanodeInfo retDNInfo =
+          dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
+      assertEquals(dnInfo, retDNInfo);
+    } finally {
+      fs.delete(filePath, true);
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testReadWithoutPreferredCachingReplica() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFS_CLIENT_READ_USE_CACHE_PRIORITY, false);
+    MiniDFSCluster cluster =
+            new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = null;
+    Path filePath = new Path("/testReadWithoutPreferredCachingReplica");
+    try {
+      fs = cluster.getFileSystem();
+      FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
+      DFSInputStream dfsInputStream =
+              (DFSInputStream) fs.open(filePath).getWrappedStream();
+      LocatedBlock lb = mock(LocatedBlock.class);
+      when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
+      DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0", 1111,
+              1112, 1113, 1114);
+      DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
+      when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
+      DatanodeInfo retDNInfo =
+              dfsInputStream.getBestNodeDNAddrPair(lb, null).info;
+      assertEquals(dnInfo, retDNInfo);
+    } finally {
+      fs.delete(filePath, true);
+      cluster.shutdown();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message