Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E342C1859D for ; Sun, 6 Dec 2015 07:13:13 +0000 (UTC) Received: (qmail 6202 invoked by uid 500); 6 Dec 2015 07:13:05 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 5950 invoked by uid 500); 6 Dec 2015 07:13:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 4633 invoked by uid 99); 6 Dec 2015 07:13:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 06 Dec 2015 07:13:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5EAE3E03C8; Sun, 6 Dec 2015 07:13:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Sun, 06 Dec 2015 07:13:35 -0000 Message-Id: In-Reply-To: <5403537e05c0447f827148a3c20cca60@git.apache.org> References: <5403537e05c0447f827148a3c20cca60@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/38] hadoop git commit: HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves without DN restart. (Contributed by Xiaobing Zhou) HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves without DN restart. (Contributed by Xiaobing Zhou) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d817fa1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d817fa1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d817fa1 Branch: refs/heads/yarn-2877 Commit: 9d817fa1b14b477e5440ae4edd78de849976d9b5 Parents: 59dbe8b Author: Arpit Agarwal Authored: Fri Dec 4 14:46:46 2015 -0800 Committer: Arpit Agarwal Committed: Fri Dec 4 14:46:46 2015 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 40 ++- .../hdfs/server/datanode/DataXceiverServer.java | 58 +++-- .../datanode/TestDataNodeReconfiguration.java | 241 +++++++++++++++++++ .../apache/hadoop/hdfs/tools/TestDFSAdmin.java | 2 +- 5 files changed, 319 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 34c3ff2..e10450d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1720,6 +1720,9 @@ Release 2.8.0 - UNRELEASED HDFS-9474. TestPipelinesFailover should not fail when printing debug message. (John Zhuge via Yongjun Zhang) + HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves + without DN restart. (Xiaobing Zhou via Arpit Agarwal) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0a68758..150ce6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -42,6 +42,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFA import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; @@ -92,7 +94,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -212,6 +213,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -284,7 +286,9 @@ public class DataNode extends ReconfigurableBase /** A list of property that are reconfigurable at runtime. */ private static final List RECONFIGURABLE_PROPERTIES = Collections.unmodifiableList( - Arrays.asList(DFS_DATANODE_DATA_DIR_KEY)); + Arrays.asList( + DFS_DATANODE_DATA_DIR_KEY, + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); @@ -522,6 +526,38 @@ public class DataNode extends ReconfigurableBase } } } + } else if (property.equals( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)) { + ReconfigurationException rootException = null; + try { + LOG.info("Reconfiguring " + property + " to " + newVal); + int movers; + if (newVal == null) { + // set to default + movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; + } else { + movers = Integer.parseInt(newVal); + if (movers <= 0) { + rootException = new ReconfigurationException( + property, + newVal, + getConf().get(property), + new IllegalArgumentException( + "balancer max concurrent movers must be larger than 0")); + } + } + xserver.updateBalancerMaxConcurrentMovers(movers); + } catch(NumberFormatException nfe) { + rootException = new ReconfigurationException( + property, newVal, getConf().get(property), nfe); + } finally { + if (rootException != null) { + LOG.warn(String.format( + "Exception in updating balancer max concurrent movers %s to %s", + property, newVal), rootException); + throw rootException; + } + } } else { throw new ReconfigurationException( property, newVal, getConf().get(property)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 36852eb..36cf8a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -31,6 +32,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; import com.google.common.annotations.VisibleForTesting; + import org.slf4j.Logger; /** @@ -64,36 +66,45 @@ class DataXceiverServer implements Runnable { */ static class BlockBalanceThrottler extends DataTransferThrottler { private int numThreads; - private int maxThreads; - + private final AtomicInteger maxThreads = new AtomicInteger(0); + /**Constructor * * @param bandwidth Total amount of bandwidth can be used for balancing */ - private BlockBalanceThrottler(long bandwidth, int maxThreads) { - super(bandwidth); - this.maxThreads = maxThreads; - LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s"); - LOG.info("Number threads for balancing is "+ maxThreads); - } - + private BlockBalanceThrottler(long bandwidth, int maxThreads) { + super(bandwidth); + this.maxThreads.set(maxThreads); + LOG.info("Balancing bandwith is " + bandwidth + " bytes/s"); + LOG.info("Number threads for balancing is " + maxThreads); + } + + private void setMaxConcurrentMovers(int movers) { + this.maxThreads.set(movers); + } + + @VisibleForTesting + int getMaxConcurrentMovers() { + return this.maxThreads.get(); + } + /** Check if the block move can start. * * Return true if the thread quota is not exceeded and * the counter is incremented; False otherwise. */ - synchronized boolean acquire() { - if (numThreads >= maxThreads) { - return false; - } - numThreads++; - return true; - } - - /** Mark that the move is completed. The thread counter is decremented. */ - synchronized void release() { - numThreads--; - } + synchronized boolean acquire() { + if (numThreads >= maxThreads.get()) { + return false; + } + numThreads++; + return true; + } + + /** Mark that the move is completed. The thread counter is decremented. */ + synchronized void release() { + numThreads--; + } } final BlockBalanceThrottler balanceThrottler; @@ -108,7 +119,6 @@ class DataXceiverServer implements Runnable { DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) { - this.peerServer = peerServer; this.datanode = datanode; @@ -288,4 +298,8 @@ class DataXceiverServer implements Runnable { peers.remove(peer); peersXceiver.remove(peer); } + + public void updateBalancerMaxConcurrentMovers(int movers) { + balanceThrottler.setMaxConcurrentMovers(movers); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java new file mode 100644 index 0000000..edaf7ab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -0,0 +1,241 @@ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test to reconfigure some parameters for DataNode without restart + */ +public class TestDataNodeReconfiguration { + + private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class); + private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() + + "data"; + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); + private final int NUM_NAME_NODE = 1; + private final int NUM_DATA_NODE = 10; + private MiniDFSCluster cluster; + + @Before + public void Setup() throws IOException { + startDFSCluster(NUM_NAME_NODE, NUM_DATA_NODE); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + + File dir = new File(DATA_DIR); + if (dir.exists()) + Assert.assertTrue("Cannot delete data-node dirs", + FileUtil.fullyDelete(dir)); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes) + throws IOException { + Configuration conf = new Configuration(); + + MiniDFSNNTopology nnTopology = MiniDFSNNTopology + .simpleFederatedTopology(numNameNodes); + + cluster = new MiniDFSCluster.Builder(conf).nnTopology(nnTopology) + .numDataNodes(numDataNodes).build(); + cluster.waitActive(); + } + + /** + * Starts an instance of DataNode + * + * @throws IOException + */ + public DataNode[] createDNsForTest(int numDateNode) throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); + conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + + DataNode[] result = new DataNode[numDateNode]; + for (int i = 0; i < numDateNode; i++) { + result[i] = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR); + } + return result; + } + + @Test + public void testMaxConcurrentMoversReconfiguration() + throws ReconfigurationException, IOException { + int maxConcurrentMovers = 10; + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // try invalid values + try { + dn.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + try { + dn.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + try { + dn.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(0)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + + // change properties + dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(maxConcurrentMovers)); + + // verify change + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), + maxConcurrentMovers, dn.xserver.balanceThrottler.getMaxConcurrentMovers()); + + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), + maxConcurrentMovers, Integer.parseInt(dn.getConf().get( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY))); + + // revert to default + dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + null); + + // verify default + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT, + dn.xserver.balanceThrottler.getMaxConcurrentMovers()); + + assertEquals(String.format("expect %s is not configured", + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), null, dn + .getConf().get(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); + } + } + + @Test + public void testAcquireWithMaxConcurrentMoversGreaterThanDefault() + throws IOException, ReconfigurationException { + testAcquireWithMaxConcurrentMoversShared(10); + } + + @Test + public void testAcquireWithMaxConcurrentMoversLessThanDefault() + throws IOException, ReconfigurationException { + testAcquireWithMaxConcurrentMoversShared(3); + } + + private void testAcquireWithMaxConcurrentMoversShared( + int maxConcurrentMovers) + throws IOException, ReconfigurationException { + DataNode[] dns = null; + try { + dns = createDNsForTest(1); + testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], + maxConcurrentMovers); + } catch (IOException ioe) { + throw ioe; + } catch (ReconfigurationException re) { + throw re; + } finally { + shutDownDNs(dns); + } + } + + private void shutDownDNs(DataNode[] dns) { + if (dns == null) { + return; + } + + for (int i = 0; i < dns.length; i++) { + try { + if (dns[i] == null) { + continue; + } + dns[i].shutdown(); + } catch (Exception e) { + LOG.error("Cannot close: ", e); + } + } + } + + private void testAcquireOnMaxConcurrentMoversReconfiguration( + DataNode dataNode, int maxConcurrentMovers) throws IOException, + ReconfigurationException { + int defaultMaxThreads = dataNode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + for (int i = 0; i < defaultMaxThreads; i++) { + assertEquals("should be able to get thread quota", true, + dataNode.xserver.balanceThrottler.acquire()); + } + + assertEquals("should not be able to get thread quota", false, + dataNode.xserver.balanceThrottler.acquire()); + + // change properties + dataNode.reconfigureProperty( + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + String.valueOf(maxConcurrentMovers)); + + assertEquals("thread quota is wrong", maxConcurrentMovers, + dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota + + int val = Math.abs(maxConcurrentMovers - defaultMaxThreads); + if (defaultMaxThreads < maxConcurrentMovers) { + for (int i = 0; i < val; i++) { + assertEquals("should be able to get thread quota", true, + dataNode.xserver.balanceThrottler.acquire()); + } + } else if (defaultMaxThreads > maxConcurrentMovers) { + for (int i = 0; i < val; i++) { + assertEquals("should not be able to get thread quota", false, + dataNode.xserver.balanceThrottler.acquire()); + } + } + + assertEquals("should not be able to get thread quota", false, + dataNode.xserver.balanceThrottler.acquire()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index a2b5638..3a30ccf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -207,7 +207,7 @@ public class TestDFSAdmin { final String address = "localhost:" + port; List outputs = getReconfigurationAllowedProperties("datanode", address); - assertEquals(2, outputs.size()); + assertEquals(3, outputs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outputs.get(1)); }