Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B11A02009D9 for ; Thu, 19 May 2016 22:02:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AF91B160A00; Thu, 19 May 2016 20:02:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AC02F1609AE for ; Thu, 19 May 2016 22:02:32 +0200 (CEST) Received: (qmail 14043 invoked by uid 500); 19 May 2016 20:02:31 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 14034 invoked by uid 99); 19 May 2016 20:02:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 May 2016 20:02:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5BF62DFC71; Thu, 19 May 2016 20:02:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-4002. Make ResourceTrackerService#nodeHeartbeat more concurrent. Contributed by Rohith Sharma K S & Zhiguo Hong Date: Thu, 19 May 2016 20:02:31 +0000 (UTC) archived-at: Thu, 19 May 2016 20:02:33 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.8 27d6538b8 -> 61fa9256c YARN-4002. Make ResourceTrackerService#nodeHeartbeat more concurrent. Contributed by Rohith Sharma K S & Zhiguo Hong (cherry picked from commit feb90ffcca536e7deac50976b8a8774450fe089f) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61fa9256 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61fa9256 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61fa9256 Branch: refs/heads/branch-2.8 Commit: 61fa9256c5c6892a7c1f42ecbda6476a71755caf Parents: 27d6538 Author: Jian He Authored: Thu May 19 13:01:36 2016 -0700 Committer: Jian He Committed: Thu May 19 13:02:24 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/util/HostsFileReader.java | 159 +++++++++++++------ .../apache/hadoop/util/TestHostsFileReader.java | 26 +++ .../resourcemanager/NodesListManager.java | 69 ++++---- 3 files changed, 166 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/61fa9256/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c9..c5d6b86 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -21,6 +21,9 @@ package org.apache.hadoop.util; import java.io.*; import java.util.Set; import java.util.HashSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.io.Charsets; import org.apache.commons.logging.LogFactory; @@ -38,6 +41,8 @@ public class HostsFileReader { private Set excludes; private String includesFile; private String excludesFile; + private WriteLock writeLock; + private ReadLock readLock; private static final Log LOG = LogFactory.getLog(HostsFileReader.class); @@ -47,6 +52,9 @@ public class HostsFileReader { excludes = new HashSet(); includesFile = inFile; excludesFile = exFile; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.writeLock = rwLock.writeLock(); + this.readLock = rwLock.readLock(); refresh(); } @@ -57,6 +65,9 @@ public class HostsFileReader { excludes = new HashSet(); this.includesFile = includesFile; this.excludesFile = excludesFile; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.writeLock = rwLock.writeLock(); + this.readLock = rwLock.readLock(); refresh(inFileInputStream, exFileInputStream); } @@ -101,80 +112,126 @@ public class HostsFileReader { } } - public synchronized void refresh() throws IOException { - LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); - boolean switchIncludes = false; - boolean switchExcludes = false; - if (!includesFile.isEmpty()) { - readFileToSet("included", includesFile, newIncludes); - switchIncludes = true; - } - if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); - switchExcludes = true; + public void refresh() throws IOException { + this.writeLock.lock(); + try { + refresh(includesFile, excludesFile); + } finally { + this.writeLock.unlock(); } + } - if (switchIncludes) { - // switch the new hosts that are to be included - includes = newIncludes; - } - if (switchExcludes) { - // switch the excluded hosts - excludes = newExcludes; + public void refresh(String includeFiles, String excludeFiles) + throws IOException { + LOG.info("Refreshing hosts (include/exclude) list"); + this.writeLock.lock(); + try { + // update instance variables + updateFileNames(includeFiles, excludeFiles); + Set newIncludes = new HashSet(); + Set newExcludes = new HashSet(); + boolean switchIncludes = false; + boolean switchExcludes = false; + if (includeFiles != null && !includeFiles.isEmpty()) { + readFileToSet("included", includeFiles, newIncludes); + switchIncludes = true; + } + if (excludeFiles != null && !excludeFiles.isEmpty()) { + readFileToSet("excluded", excludeFiles, newExcludes); + switchExcludes = true; + } + + if (switchIncludes) { + // switch the new hosts that are to be included + includes = newIncludes; + } + if (switchExcludes) { + // switch the excluded hosts + excludes = newExcludes; + } + } finally { + this.writeLock.unlock(); } } @Private - public synchronized void refresh(InputStream inFileInputStream, + public void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); - boolean switchIncludes = false; - boolean switchExcludes = false; - if (inFileInputStream != null) { - readFileToSetWithFileInputStream("included", includesFile, - inFileInputStream, newIncludes); - switchIncludes = true; - } - if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, - exFileInputStream, newExcludes); - switchExcludes = true; - } - if (switchIncludes) { - // switch the new hosts that are to be included - includes = newIncludes; + this.writeLock.lock(); + try { + Set newIncludes = new HashSet(); + Set newExcludes = new HashSet(); + boolean switchIncludes = false; + boolean switchExcludes = false; + if (inFileInputStream != null) { + readFileToSetWithFileInputStream("included", includesFile, + inFileInputStream, newIncludes); + switchIncludes = true; + } + if (exFileInputStream != null) { + readFileToSetWithFileInputStream("excluded", excludesFile, + exFileInputStream, newExcludes); + switchExcludes = true; + } + if (switchIncludes) { + // switch the new hosts that are to be included + includes = newIncludes; + } + if (switchExcludes) { + // switch the excluded hosts + excludes = newExcludes; + } + } finally { + this.writeLock.unlock(); } - if (switchExcludes) { - // switch the excluded hosts - excludes = newExcludes; + } + + public Set getHosts() { + this.readLock.lock(); + try { + return includes; + } finally { + this.readLock.unlock(); } } - public synchronized Set getHosts() { - return includes; + public Set getExcludedHosts() { + this.readLock.lock(); + try { + return excludes; + } finally { + this.readLock.unlock(); + } } - public synchronized Set getExcludedHosts() { - return excludes; + public void getHostDetails(Set includes, Set excludes) { + this.readLock.lock(); + try { + includes.addAll(this.includes); + excludes.addAll(this.excludes); + } finally { + this.readLock.unlock(); + } } - public synchronized void setIncludesFile(String includesFile) { + public void setIncludesFile(String includesFile) { LOG.info("Setting the includes file to " + includesFile); this.includesFile = includesFile; } - public synchronized void setExcludesFile(String excludesFile) { + public void setExcludesFile(String excludesFile) { LOG.info("Setting the excludes file to " + excludesFile); this.excludesFile = excludesFile; } - public synchronized void updateFileNames(String includesFile, - String excludesFile) { - setIncludesFile(includesFile); - setExcludesFile(excludesFile); + public void updateFileNames(String includeFiles, String excludeFiles) { + this.writeLock.lock(); + try { + setIncludesFile(includeFiles); + setExcludesFile(excludeFiles); + } finally { + this.writeLock.unlock(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/61fa9256/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 7de0be8..b1c5825 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -20,6 +20,8 @@ package org.apache.hadoop.util; import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; +import java.util.HashSet; +import java.util.Set; import org.junit.*; import static org.junit.Assert.*; @@ -96,6 +98,30 @@ public class TestHostsFileReader { assertTrue(hfp.getExcludedHosts().contains("somehost5")); assertFalse(hfp.getExcludedHosts().contains("host4")); + // test for refreshing hostreader wit new include/exclude host files + String newExcludesFile = HOSTS_TEST_DIR + "/dfs1.exclude"; + String newIncludesFile = HOSTS_TEST_DIR + "/dfs1.include"; + + efw = new FileWriter(newExcludesFile); + ifw = new FileWriter(newIncludesFile); + + efw.write("#DFS-Hosts-excluded\n"); + efw.write("node1\n"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write("node2\n"); + ifw.close(); + + hfp.refresh(newIncludesFile, newExcludesFile); + assertTrue(hfp.getExcludedHosts().contains("node1")); + assertTrue(hfp.getHosts().contains("node2")); + + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hfp.getHostDetails(hostsList, excludeList); + assertTrue(excludeList.contains("node1")); + assertTrue(hostsList.contains("node2")); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/61fa9256/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index a093540..e69db29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -182,10 +182,15 @@ public class NodesListManager extends CompositeService implements YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); - for (String include : hostsReader.getHosts()) { + + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hostsReader.getHostDetails(hostsList, excludeList); + + for (String include : hostsList) { LOG.debug("include: " + include); } - for (String exclude : hostsReader.getExcludedHosts()) { + for (String exclude : excludeList) { LOG.debug("exclude: " + exclude); } } @@ -207,25 +212,17 @@ public class NodesListManager extends CompositeService implements private void refreshHostsReader(Configuration yarnConf) throws IOException, YarnException { - synchronized (hostsReader) { - if (null == yarnConf) { - yarnConf = new YarnConfiguration(); - } - includesFile = - yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - excludesFile = - yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); - hostsReader.updateFileNames(includesFile, excludesFile); - hostsReader.refresh( - includesFile.isEmpty() ? null : this.rmContext - .getConfigurationProvider().getConfigurationInputStream( - this.conf, includesFile), excludesFile.isEmpty() ? null - : this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(this.conf, excludesFile)); - printConfiguredHosts(); - } + if (null == yarnConf) { + yarnConf = new YarnConfiguration(); + } + includesFile = + yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); + excludesFile = + yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + hostsReader.refresh(includesFile, excludesFile); + printConfiguredHosts(); } private void setDecomissionedNMs() { @@ -363,13 +360,13 @@ public class NodesListManager extends CompositeService implements public boolean isValidNode(String hostName) { String ip = resolver.resolve(hostName); - synchronized (hostsReader) { - Set hostsList = hostsReader.getHosts(); - Set excludeList = hostsReader.getExcludedHosts(); - return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList - .contains(ip)) - && !(excludeList.contains(hostName) || excludeList.contains(ip)); - } + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hostsReader.getHostDetails(hostsList, excludeList); + + return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList + .contains(ip)) + && !(excludeList.contains(hostName) || excludeList.contains(ip)); } @Override @@ -466,17 +463,15 @@ public class NodesListManager extends CompositeService implements } public boolean isUntrackedNode(String hostName) { - boolean untracked; String ip = resolver.resolve(hostName); - synchronized (hostsReader) { - Set hostsList = hostsReader.getHosts(); - Set excludeList = hostsReader.getExcludedHosts(); - untracked = !hostsList.isEmpty() && - !hostsList.contains(hostName) && !hostsList.contains(ip) && - !excludeList.contains(hostName) && !excludeList.contains(ip); - } - return untracked; + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hostsReader.getHostDetails(hostsList, excludeList); + + return !hostsList.isEmpty() && !hostsList.contains(hostName) + && !hostsList.contains(ip) && !excludeList.contains(hostName) + && !excludeList.contains(ip); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org